#!/usr/bin/python3 # -*- coding: utf-8 -*- from collections import OrderedDict import csv import os from pathlib import Path import sys # \brief Class that reads and stores the message data. class MsgData(object): # \brief Initializer def __init__(self): self.data = { } self._msg_col = [ 'hex_msg_id', 'dec_msg_id', 'msg_id', 'source', 'module', 'type', 'can_channel', 'hex_can_channel', 'ack_status', 'payload' ] @property def field_types(self): types = set() for (msg_id_value, msg) in self.data.items(): types.update(field['type'].upper() for field in msg['payload']) return list(sorted(types)) def field_list(self, msg_id_value, empty_string=True): fields = [ ] for field in self.data[msg_id_value]['payload']: fields.append(f"{field['type']}-{field['name']}") if len(fields) == 0: fields.append("") return fields # \brief Converts a value to a hex string # \return hex string representation of the inputted value @staticmethod def value_to_hex_string(value): return f"0x{f'{value:x}'.upper()}" # \brief Print all loaded data to the console # \return none def dump(self): for row in self.data: print(f"{row}:") print(f" msg_name: {self.data[row]['msg_name']}") print(f" msg_id: {self.data[row]['msg_id_hex_string']} ({self.data[row]['msg_id_value']})") print(f" can_channel: {self.data[row]['can_channel']} ({MsgData.value_to_hex_string(self.data[row]['can_channel_value'])})") print(f" ack_status: {self.data[row]['ack_status']}") if (len(self.data[row]['payload'])): print(f" payload: [") for field in self.data[row]['payload']: print(f" {field['name']} : {field['type']}") print(f" ]") else: print(f" payload: [ ]") print(f" raw: {self.data[row]['raw']}") print(f"") # \brief Load the csv files and cache the data. # \note This will clear any previously cached csv data. # \param[in] filename Filename of the csv to load. # \param[in] clear If true, clear out any already loaded data before processing new data, # otherwise new data will be added to the existing data # \return none def load(self, filename, clear=True): filename = Path(filename).resolve() def get_field_from_entry(entry, column): if column in self._msg_col and len(entry) > self._msg_col.index(column): return entry[self._msg_col.index(column)].strip() return '' def get_int_field_from_entry(entry, column): value = get_field_from_entry(entry, column) return int(value) if value != '' else 0 def get_hex_field_from_entry(entry, column): value = get_field_from_entry(entry, column) return int(value, 16) if value != '' else 0 # clear the previously cached csv data, if requested if clear: self.data = { } with open(filename, mode='r', encoding='utf-8') as in_file: # skip the first row, it is the header header = in_file.readline() entry_count = 0 # loop thru each entry/row in the file and read in the data for entry in list(csv.reader(in_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)): entry_count += 1 msg_id_value = get_int_field_from_entry(entry, 'dec_msg_id') row_data = { 'msg_id': entry[self._msg_col.index('msg_id')].strip(), 'msg_id_value': get_int_field_from_entry(entry, 'dec_msg_id'), 'msg_id_hex_string': MsgData.value_to_hex_string(get_int_field_from_entry(entry, 'dec_msg_id')), 'can_channel': get_field_from_entry(entry, 'can_channel'), 'can_channel_value': get_hex_field_from_entry(entry, 'hex_can_channel'), 'ack_status': bool(get_field_from_entry(entry, 'ack_status') == 'ACK_REQUIRED'), 'msg_name': self.__message_name(entry[self._msg_col.index('msg_id')].strip()), 'payload': [ ], 'raw': entry, } for i in range(len(self._msg_col), len(entry)): if '-' in entry[i]: result = entry[i].split('-', 1) if '.' in result[1]: print(f"WARNING: {row_data['msg_id']} ({row_data['msg_id_hex_string']}) field \"{entry[i]}\" contains illegal character \'.\'") result[1] = result[1].replace(' ', '').split('.')[-1] field_name = result[1] count = 0 while any(field['name'] == field_name for field in row_data['payload']): if count == 0: print(f"WARNING: {row_data['msg_id']} ({row_data['msg_id_hex_string']}) contains duplicate field \"{field_name}\"") count += 1 field_name = f"{result[1]}{count}" row_data['payload'].append({ 'raw': entry[i], 'name': field_name, 'type': result[0], }) if msg_id_value in self.data: print(f"WARNING: found MesgIDs with same value, {self.data[msg_id_value]['msg_id']} will be replaced by {row_data['msg_id']} for value {row_data['msg_id_hex_string']}") self.data[msg_id_value] = row_data in_file.close() # sort the collected entries self.data = OrderedDict(sorted(self.data.items(), key=lambda x: (int(x[1]['msg_id_value'] if x[1]['msg_id_value'] is not None else 99999), x[0]))) print(f'Loaded {filename} with {entry_count} entries') # \brief Sort the cached csv data. # \return none def sort(self): try: self.data = OrderedDict(sorted(self.data.items(), key=lambda x: (int(x[1]['msg_id_value'] if x[1]['msg_id_value'] is not None else 99999), x[0]))) except Exception as e: pass def __message_name(self, msg_id): # if python version >= 3.9.x, just use str.removeprefix built-in string function if sys.version_info >= (3,9,0): return ''.join(word.capitalize() for word in msg_id.removeprefix('MSG_ID_').split('_')) # otherwise need to define and use remove_prefix custom function else: def remove_prefix(text, prefix): if text.startswith(prefix): return text[len(prefix):] return text return ''.join(word.capitalize() for word in remove_prefix(msg_id, 'MSG_ID_').split('_'))