Index: leahi_dialin/utils/abstract_classes.py =================================================================== diff -u --- leahi_dialin/utils/abstract_classes.py (revision 0) +++ leahi_dialin/utils/abstract_classes.py (revision 7efc3e18c36a7110657fce7cf7ff4ff14aec55ef) @@ -0,0 +1,157 @@ +########################################################################### +# +# Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file abstract_classes.py +# +# @author (last) Zoltan Miskolci +# @date (last) 04-May-2026 +# @author (original) Zoltan Miskolci +# @date (original) 04-May-2026 +# +############################################################################ + +# Module imports +from abc import ABC, abstractmethod +from typing import List, Tuple +import struct + +# Project imports +from leahi_dialin.common.generic_defs import DataTypes +from leahi_dialin.common.constants import MSG_HEADER_SIZE + + +class AbstractObserver(ABC): + """ + Publicly accessible parent class for all observers. + + The update method will receive data when data is made available + """ + + @abstractmethod + def update(self): + """ + Attach an observer + """ + pass + + +class AbstractSubSystem: + + @abstractmethod + def __init__(self): + """ + Initialization function for the sub system + # The abstract base class requires all abstract methods are overridden by children classes + """ + self._observers = [] + self._datetime_fmt = "%m.%d.%Y_%I.%M.%S.%f" + pass + + + def attach(self, observer: AbstractObserver): + """ + Attach an observer so it is updated upon published events + """ + self._observers.append(observer) + + + def detach(self, observer: AbstractObserver): + """ + Detach an observer + """ + self._observers.remove(observer) + + + def process_into_vars(self, decoder_list: List[Tuple], message, start_from_byte: int=0, debug: bool=False) -> dict: + """ + Process the CAN message with the help of the decoder list into variables and a dictionary. + Note: updating variables will only be done when it's class wide one, aka "self.attr_name". + For local attributes to avoid namespace issues use the returned dictionary. Format: {attr_name : value} + + :param decoder_list: (List[Tuple[String, DataTypes]]) Contains the variable name and DataType pair of the indexed message + :param message: (Bytearray) The raw CAN message + :param start_from_byte: (Integer) Start from the nth byte after the header + :param debug: (Boolean) Prints for debugging + :return: (Dictionary) A dictionary for the variable_name and value pair + """ + start_pos = MSG_HEADER_SIZE + start_from_byte + results = {} + if debug: + print(f'\n\ndecoder_list: {decoder_list}') + print(f'len: {len(decoder_list[0])}') + for decode_details in decoder_list: + # Content of the decode list + variable_name = decode_details[0] + datatype: DataTypes = decode_details[-1] + end_pos = start_pos + datatype.size() + value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] + if debug: + print(f'value: {value} ({datatype.name})') + print(f'pos: {start_pos} - {end_pos}') + if 'nan' in str(value).lower(): + raise ValueError(f'{value} is not an accepted value!') + if datatype is DataTypes.BOOL: + value = True if value == 1 else False + results[variable_name] = value + + # If it's a instance variable (self.) then set it's value + if variable_name.startswith('self'): + attr_path = variable_name[5:].split('.') + obj = self + for attr in attr_path[:-1]: + obj = getattr(obj, attr) + setattr(obj, attr_path[-1], value) + start_pos = end_pos + if debug: + print('Finished cycle\n') + if debug: + print(f'results: {results}\n') + return results + + + def process_into_dict(self, dict_to_update: dict, decoder_list: List[Tuple], message, start_from_byte: int=0, debug: bool=False): + """ + Process the CAN message with the help of the decoder list into a dictionary. + + :param decoder_list: (List[Tuple[DialEnum, DialEnum, DataTypes]]) Contains the dictioarny key names and DataType of the indexed message + :param message: (Bytearray) The raw CAN message + :param start_from_byte: (Integer) Start from the nth byte after the header + :param debug: (Boolean) Prints for debugging + :return: (Dictionary) The updated dictionary + """ + start_pos = MSG_HEADER_SIZE + start_from_byte + if debug: + print(f'\n\ndecoder_list: {decoder_list}') + print(f'len: {len(decoder_list[0])}') + for decode_details in decoder_list: + # Content of the decode list + key_1 = decode_details[0] + key_2 = decode_details[1] if len(decode_details) >= 3 else None + datatype: DataTypes = decode_details[-1] + + end_pos = start_pos + datatype.size() + value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] + if debug: + print(f'key_1: {key_1}') + print(f'key_2: {key_2}') + print(f'value: {value} ({datatype.name})') + print(f'pos: {start_pos} - {end_pos}') + if 'nan' in str(value).lower(): + raise ValueError(f'{value} is not an accepted value!') + # If the type is Bool, convert the value from Integer to Boolean + if datatype is DataTypes.BOOL: + value = True if value == 1 else False + # Save the value into the Dictionary + if len(decode_details) == 2: + dict_to_update[key_1] = value + elif len(decode_details) == 3: + dict_to_update[key_1][key_2] = value + start_pos = end_pos + if debug: + print('Finished cycle\n') + if debug: + print('done\n')