########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file abstract_classes.py # # @author (last) Zoltan Miskolci # @date (last) 04-May-2026 # @author (original) Zoltan Miskolci # @date (original) 04-May-2026 # ############################################################################ # Module imports from abc import ABC, abstractmethod from typing import List, Tuple import struct # Project imports from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MSG_HEADER_SIZE class AbstractObserver(ABC): """ Publicly accessible parent class for all observers. The update method will receive data when data is made available """ @abstractmethod def update(self): """ Attach an observer """ pass class AbstractSubSystem: @abstractmethod def __init__(self): """ Initialization function for the sub system # The abstract base class requires all abstract methods are overridden by children classes """ self._observers = [] self._datetime_fmt = "%m.%d.%Y_%I.%M.%S.%f" pass def attach(self, observer: AbstractObserver): """ Attach an observer so it is updated upon published events """ self._observers.append(observer) def detach(self, observer: AbstractObserver): """ Detach an observer """ self._observers.remove(observer) def process_into_vars(self, decoder_list: List[Tuple], message, start_from_byte: int=0, debug: bool=False) -> dict: """ Process the CAN message with the help of the decoder list into variables and a dictionary. Note: updating variables will only be done when it's class wide one, aka "self.attr_name". For local attributes to avoid namespace issues use the returned dictionary. Format: {attr_name : value} :param decoder_list: (List[Tuple[String, DataTypes]]) Contains the variable name and DataType pair of the indexed message :param message: (Bytearray) The raw CAN message :param start_from_byte: (Integer) Start from the nth byte after the header :param debug: (Boolean) Prints for debugging :return: (Dictionary) A dictionary for the variable_name and value pair """ start_pos = MSG_HEADER_SIZE + start_from_byte results = {} if debug: print(f'\n\ndecoder_list: {decoder_list}') print(f'len: {len(decoder_list[0])}') for decode_details in decoder_list: # Content of the decode list variable_name = decode_details[0] datatype: DataTypes = decode_details[-1] end_pos = start_pos + datatype.size() value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] if debug: print(f'value: {value} ({datatype.name})') print(f'pos: {start_pos} - {end_pos}') if 'nan' in str(value).lower(): raise ValueError(f'{value} is not an accepted value!') if datatype is DataTypes.BOOL: value = True if value == 1 else False results[variable_name] = value # If it's a instance variable (self.) then set it's value if variable_name.startswith('self'): attr_name = variable_name[5:] if not attr_name.isidentifier(): print(f'Invalid attribute name: "{attr_name}"!') raise ValueError('Invalid attribute name') setattr(self, attr_name, value) start_pos = end_pos if debug: print('Finished cycle\n') if debug: print(f'results: {results}\n') return results def process_into_dict(self, dict_to_update: dict, decoder_list: List[Tuple], message, start_from_byte: int=0, debug: bool=False): """ Process the CAN message with the help of the decoder list into a dictionary. :param decoder_list: (List[Tuple[DialEnum, DialEnum, DataTypes]]) Contains the dictioarny key names and DataType of the indexed message :param message: (Bytearray) The raw CAN message :param start_from_byte: (Integer) Start from the nth byte after the header :param debug: (Boolean) Prints for debugging :return: (Dictionary) The updated dictionary """ start_pos = MSG_HEADER_SIZE + start_from_byte if debug: print(f'\n\ndecoder_list: {decoder_list}') print(f'len: {len(decoder_list[0])}') for decode_details in decoder_list: # Content of the decode list key_1 = decode_details[0] key_2 = decode_details[1] if len(decode_details) >= 3 else None datatype: DataTypes = decode_details[-1] end_pos = start_pos + datatype.size() value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] if debug: print(f'key_1: {key_1}') print(f'key_2: {key_2}') print(f'value: {value} ({datatype.name})') print(f'pos: {start_pos} - {end_pos}') if 'nan' in str(value).lower(): raise ValueError(f'{value} is not an accepted value!') # If the type is Bool, convert the value from Integer to Boolean if datatype is DataTypes.BOOL: value = True if value == 1 else False # Save the value into the Dictionary if len(decode_details) == 2: dict_to_update[key_1] = value elif len(decode_details) == 3: dict_to_update[key_1][key_2] = value start_pos = end_pos if debug: print('Finished cycle\n') if debug: print('done\n')