########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file abstract_classes.py # # @author (last) Zoltan Miskolci # @date (last) 04-May-2026 # @author (original) Zoltan Miskolci # @date (original) 04-May-2026 # ############################################################################ # Module imports from abc import ABC, abstractmethod from typing import List, Tuple import struct # Project imports from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MSG_HEADER_SIZE class AbstractObserver(ABC): """ Publicly accessible parent class for all observers. The update method will receive data when data is made available """ @abstractmethod def update(self): """ Attach an observer """ pass class AbstractSubSystem: @abstractmethod def __init__(self): """ Initialization function for the sub system # The abstract base class requires all abstract methods are overridden by children classes """ self._observers = [] self._datetime_fmt = "%m.%d.%Y_%I.%M.%S.%f" pass def attach(self, observer: AbstractObserver): """ Attach an observer so it is updated upon published events """ self._observers.append(observer) def detach(self, observer: AbstractObserver): """ Detach an observer """ self._observers.remove(observer) def process_into_vars(self, decoder_list: List[Tuple], message, start_from_byte: int=0) -> dict: """ Process the CAN message with the help of the decoder list into variables. :param decoder_list: (List[Tuple[String, DataTypes]]) Contains the variable name and DataType pair of the indexed message :param message: (Bytearray) The raw CAN message :param start_from_byte: (Integer) Start from the nth byte after the header :return: (Dictionary) A dictionary for the variable_name and value pair """ start_pos = MSG_HEADER_SIZE + start_from_byte results = {} for decode_details in decoder_list: # Content of the decode list variable_name = decode_details[0] datatype: DataTypes = decode_details[-1] end_pos = start_pos + datatype.size() value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] if 'nan' in str(value).lower(): raise ValueError(f'{value} is not an accepted value!') if datatype is DataTypes.BOOL: value = True if value == 1 else False results[variable_name] = value exec(f'{variable_name} = {value}') start_pos = end_pos return results def process_into_dict(self, dict_to_update: dict, decoder_list: List[Tuple], message, start_from_byte: int=0) -> dict: """ Process the CAN message with the help of the decoder list into a dictionary. :param decoder_list: (List[Tuple[DialEnum, DialEnum, DataTypes]]) Contains the dictioarny key names and DataType of the indexed message :param message: (Bytearray) The raw CAN message :param start_from_byte: (Integer) Start from the nth byte after the header :return: (Dictionary) The updated dictionary """ start_pos = MSG_HEADER_SIZE + start_from_byte for decode_details in decoder_list: # Content of the decode list key_1_enum = decode_details[0] key_2_enum = decode_details[1] if len(decode_details) >= 3 else None datatype: DataTypes = decode_details[-1] end_pos = start_pos + datatype.size() value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] if 'nan' in str(value).lower(): raise ValueError(f'{value} is not an accepted value!') # If the type is Bool, convert the value from Integer to Boolean if datatype is DataTypes.BOOL: value = True if value == 1 else False # Save the value into the Dictionary if len(decode_details) == 2: dict_to_update[key_1_enum.name] = value elif len(decode_details) == 3: dict_to_update[key_1_enum.name][key_2_enum.name] = value start_pos = end_pos return dict_to_update