Index: leahi_dialin/utils/abstract_classes.py =================================================================== diff -u -r34b64ff2d8a64f4b7b60b80bb7cf4c36845e5943 -r27cd8e0e7b47617083e8c12f73306f0ba648e687 --- leahi_dialin/utils/abstract_classes.py (.../abstract_classes.py) (revision 34b64ff2d8a64f4b7b60b80bb7cf4c36845e5943) +++ leahi_dialin/utils/abstract_classes.py (.../abstract_classes.py) (revision 27cd8e0e7b47617083e8c12f73306f0ba648e687) @@ -66,44 +66,67 @@ self._observers.remove(observer) - def process_into_vars(self, decoder_list: List[Tuple], message, start_from_byte: int=0) -> dict: + def process_into_vars(self, decoder_list: List[Tuple], message, start_from_byte: int=0, debug: bool=False) -> dict: """ - Process the CAN message with the help of the decoder list into variables. + Process the CAN message with the help of the decoder list into variables and a dictionary. + Note: updating variables will only be done when it's class wide one, aka "self.attr_name". + For local attributes to avoid namespace issues use the returned dictionary. Format: {attr_name : value} :param decoder_list: (List[Tuple[String, DataTypes]]) Contains the variable name and DataType pair of the indexed message :param message: (Bytearray) The raw CAN message :param start_from_byte: (Integer) Start from the nth byte after the header + :param debug: (Boolean) Prints for debugging :return: (Dictionary) A dictionary for the variable_name and value pair """ start_pos = MSG_HEADER_SIZE + start_from_byte results = {} + if debug: + print(f'\n\ndecoder_list: {decoder_list}') + print(f'len: {len(decoder_list[0])}') for decode_details in decoder_list: # Content of the decode list variable_name = decode_details[0] datatype: DataTypes = decode_details[-1] - end_pos = start_pos + datatype.size() value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] + if debug: + print(f'value: {value} ({datatype.name})') + print(f'pos: {start_pos} - {end_pos}') if 'nan' in str(value).lower(): raise ValueError(f'{value} is not an accepted value!') if datatype is DataTypes.BOOL: value = True if value == 1 else False results[variable_name] = value - exec(f'{variable_name} = {value}') + + # If it's a instance variable (self.) then set it's value + if variable_name.startswith('self'): + attr_name = variable_name[5:] + if not attr_name.isidentifier(): + print(f'Invalid attribute name: "{attr_name}"!') + raise ValueError('Invalid attribute name') + setattr(self, attr_name, value) start_pos = end_pos + if debug: + print('Finished cycle\n') + if debug: + print(f'results: {results}\n') return results - def process_into_dict(self, dict_to_update: dict, decoder_list: List[Tuple], message, start_from_byte: int=0) -> dict: + def process_into_dict(self, dict_to_update: dict, decoder_list: List[Tuple], message, start_from_byte: int=0, debug: bool=False): """ Process the CAN message with the help of the decoder list into a dictionary. :param decoder_list: (List[Tuple[DialEnum, DialEnum, DataTypes]]) Contains the dictioarny key names and DataType of the indexed message :param message: (Bytearray) The raw CAN message :param start_from_byte: (Integer) Start from the nth byte after the header + :param debug: (Boolean) Prints for debugging :return: (Dictionary) The updated dictionary """ start_pos = MSG_HEADER_SIZE + start_from_byte + if debug: + print(f'\n\ndecoder_list: {decoder_list}') + print(f'len: {len(decoder_list[0])}') for decode_details in decoder_list: # Content of the decode list key_1 = decode_details[0] @@ -112,17 +135,23 @@ end_pos = start_pos + datatype.size() value = struct.unpack(datatype.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] + if debug: + print(f'key_1: {key_1}') + print(f'key_2: {key_2}') + print(f'value: {value} ({datatype.name})') + print(f'pos: {start_pos} - {end_pos}') if 'nan' in str(value).lower(): raise ValueError(f'{value} is not an accepted value!') - # If the type is Bool, convert the value from Integer to Boolean if datatype is DataTypes.BOOL: value = True if value == 1 else False - # Save the value into the Dictionary if len(decode_details) == 2: - dict_to_update[key_1.name] = value + dict_to_update[key_1] = value elif len(decode_details) == 3: - dict_to_update[key_1.name][key_2.name] = value + dict_to_update[key_1][key_2] = value start_pos = end_pos - return dict_to_update + if debug: + print('Finished cycle\n') + if debug: + print('done\n')