########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file abstract_classes.py # # @author (last) Zoltan Miskolci # @date (last) 04-May-2026 # @author (original) Zoltan Miskolci # @date (original) 04-May-2026 # ############################################################################ # Module imports from abc import ABC, abstractmethod from typing import List, Tuple import struct # Project imports from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.constants import MSG_HEADER_SIZE class AbstractObserver(ABC): """ Publicly accessible parent class for all observers. The update method will receive data when data is made available """ @abstractmethod def update(self): """ Attach an observer """ pass class AbstractSubSystem: @abstractmethod def __init__(self): """ Initialization function for the sub system # The abstract base class requires all abstract methods are overridden by children classes """ self._observers = [] self._datetime_fmt = "%m.%d.%Y_%I.%M.%S.%f" pass def attach(self, observer: AbstractObserver): """ Attach an observer so it is updated upon published events """ self._observers.append(observer) def detach(self, observer: AbstractObserver): """ Detach an observer """ self._observers.remove(observer) def process_into_vars(self, decoder_list: List[Tuple], message, start_from_byte: int=0, debug: bool=False) -> dict: """ Process the CAN message with the help of the decoder list into variables and a dictionary. Note: updating variables will only be done when it's class wide one, aka "self.attr_name". For local attributes to avoid namespace issues use the returned dictionary. Format: {attr_name : value} :param decoder_list: (List[Tuple[String, DataTypes]]) Contains the variable name and DataType pair of the indexed message :param message: (Bytearray) The raw CAN message :param start_from_byte: (Integer) Start from the nth byte after the header :param debug: (Boolean) Prints for debugging :return: (Dictionary) A dictionary for the variable_name and value pair """ start_pos = MSG_HEADER_SIZE + start_from_byte results = {} if debug: print(f'\n\ndecoder_list: {decoder_list}') print(f'len: {len(decoder_list[0])}') for decode_details in decoder_list: # Content of the decode list variable_name = decode_details[0] # If last position is multichar length if isinstance(decode_details[-1], int): length = decode_details[-1] datatype: DataTypes = decode_details[-2] else: length = 1 datatype: DataTypes = decode_details[-1] end_pos = start_pos + datatype.size() for i in range(0, length): try: new_value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] if i == 0: value = new_value else: value += new_value except Exception as e: value = None break if debug: print(f'{variable_name}: {value} ({datatype.name})') print(f'pos: {start_pos} - {end_pos}') if 'nan' in str(value).lower(): value = None break # raise ValueError(f'{value} is not an accepted value!') if datatype in [DataTypes.BOOL, DataTypes.BOOL_U08]: value = True if value == 1 else False results[variable_name] = value # If it's a instance variable (self.) then set it's value if variable_name.startswith('self'): attr_path = variable_name[5:].split('.') obj = self for attr in attr_path[:-1]: obj = getattr(obj, attr) setattr(obj, attr_path[-1], value) start_pos = end_pos if debug: print('Finished cycle\n') if debug: print(f'results: {results}\n') return results def process_into_dict(self, dict_to_update: dict, decoder_list: List[Tuple], message, start_from_byte: int=0, debug: bool=False): """ Process the CAN message with the help of the decoder list into a dictionary. :param decoder_list: (List[Tuple[DialEnum, DialEnum, DataTypes]]) Contains the dictioarny key names and DataType of the indexed message :param message: (Bytearray) The raw CAN message :param start_from_byte: (Integer) Start from the nth byte after the header :param debug: (Boolean) Prints for debugging :return: (Dictionary) The updated dictionary """ start_pos = MSG_HEADER_SIZE + start_from_byte if debug: print(f'\n\ndecoder_list: {decoder_list}') print(f'len: {len(decoder_list[0])}') for decode_details in decoder_list: # Content of the decode list key_1 = decode_details[0] key_2 = decode_details[1] if len(decode_details) >= 3 else None key_3 = decode_details[2] if len(decode_details) >= 4 else None # If last position is multichar length if isinstance(decode_details[-1], int): length = decode_details[-1] datatype: DataTypes = decode_details[-2] key_list_length = len(decode_details) - 2 else: length = 1 datatype: DataTypes = decode_details[-1] key_list_length = len(decode_details) - 1 end_pos = start_pos + datatype.size() for i in range(0, length): try: new_value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] if i == 0: value = new_value else: value += new_value except Exception as e: value = None break if debug: print(f'key_1: {key_1}') print(f'key_2: {key_2}') print(f'value: {value} ({datatype.name})') print(f'pos: {start_pos} - {end_pos}') if 'nan' in str(value).lower(): value = None # raise ValueError(f'{value} is not an accepted value!') # If the type is Bool, convert the value from Integer to Boolean if datatype in [DataTypes.BOOL, DataTypes.BOOL_U08]: value = True if value == 1 else False # Save the value into the Dictionary if key_list_length == 1: dict_to_update[key_1] = value elif key_list_length == 2: if key_1 not in dict_to_update: dict_to_update[key_1] = {} dict_to_update[key_1][key_2] = value elif key_list_length == 3: if key_1 not in dict_to_update: dict_to_update[key_1] = {} if key_2 not in dict_to_update[key_1]: dict_to_update[key_1][key_2] = {} dict_to_update[key_1][key_2][key_3] = value start_pos = end_pos if debug: print('Finished cycle\n') if debug: print('done\n') def crc16(self, data: bytes) -> int: """ Calculate a 16bit CRC for a payload. @data: (Bytes) The payload it needs CRC. @return: (Integer) The CRC value """ CRC16_TABLE = [ 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0 ] # Constants (adjust if your original values differ) INITIAL_CRC16_VAL = 0xFFFF # common default; change if needed SHIFT_8_BITS = 8 MASK_MSB = 0xFF crc = INITIAL_CRC16_VAL for byte in data: index = byte ^ ((crc >> SHIFT_8_BITS) & MASK_MSB) crc = ((crc << SHIFT_8_BITS) & 0xFFFF) ^ CRC16_TABLE[index] return crc & 0xFFFF