Index: scripts/update_package_script/utilities.py =================================================================== diff -u -r4e5c95d8cea4d29a61c3c49e5e4b2b6f76bcddb1 -radea121e63d4eabb225c26bc1efd6741c59e0df3 --- scripts/update_package_script/utilities.py (.../utilities.py) (revision 4e5c95d8cea4d29a61c3c49e5e4b2b6f76bcddb1) +++ scripts/update_package_script/utilities.py (.../utilities.py) (revision adea121e63d4eabb225c26bc1efd6741c59e0df3) @@ -7,7 +7,6 @@ from can.interfaces.socketcan.socketcan import SocketcanBus class SWUpdateEnum(Enum): - @classmethod def has_value(cls, value): return value in cls._value2member_map_ @@ -39,7 +38,11 @@ NUM_OF_TARGETS = 4 class Utilities: + """ + Software update utilities class + The utilities class to provide methods to the software update class + """ _CRC32_TABLE = ( 0x00000000, 0x1EDC6F41, 0x3DB8DE82, 0x2364B1C3, 0x7B71BD04, 0x65ADD245, 0x46C96386, 0x58150CC7, 0xF6E37A08, 0xE83F1549, 0xCB5BA48A, 0xD587CBCB, 0x8D92C70C, 0x934EA84D, 0xB02A198E, 0xAEF676CF, @@ -92,7 +95,10 @@ UPDATE_MSG_ACK_STATUS_KEY_NAME = 'update' def __init__(self): + """ + Utilities class constructor + """ self._can_bus = SocketcanBus(channel=self._CAN_INTERFACE) self._msg_id_count = 0 self._msg_sw_update_index = 0 @@ -106,26 +112,66 @@ self.file_handle = open(os.path.join(os.getcwd(), 'ack_log.log'), 'w') def get_msg_ack_nack_status(self, msg_type: str): + """ + Publicly accessible method to get the ack or nack status of a message + + @param msg_type: The message type (command or update) + + @return The status of the provided message type (e.g. ready, failed, ...) + """ return self._msg_ack_nack_status[msg_type] def clear_msg_ack_nack_status(self, msg_type: str): + """ + Publicly accessible method to clear the message ack/nack status + + @param msg_type: The message type (command or update) + + @return none + """ self._msg_ack_nack_status[msg_type] = CanCommStatus.CAN_COMM_NOT_STARTED.value def send_command_msg(self, cmd: int, target: int): + """ + Publicly accessible method to send a command message to the bootloader + @param cmd: The command to send (e.g. update, verify) + @param target: The target stack (TD, DD) + + @return none + """ + # The command message is only one frame: + # 1 byte message ID count + # 1 byte with command a nibble and target as a nibble + # 2 bytes as random value for cybersecurity. It is 0 now. + # 4 bytes as message CRC self._msg_id_count = self._msg_id_count + 1 if self._msg_id_count < 0xFF else 0 can_msg_bytes = self._convert_data_to_bytes(' 0: - left = (crc << cls._SHIFT_8_BITS_FOR_BYTE_SHIFT) & 0xFFFFFFFF - right = (crc >> cls._SHIFT_24_BITS) & 0xFFFFFFFF - crc = left ^ cls._CRC32_TABLE[data[i] ^ right] - i += 1 - length -= 1 - - return crc - - @staticmethod - def _prepare_update_mail_boxes(): - - temp = dict() - temp[SWUpdateTargets.TARGET_TD.value] = (0x603, 0.5) - temp[SWUpdateTargets.TARGET_TD_FPGA.value] = (0x603, 0.5) # TODO add more timeout for FPGAs - temp[SWUpdateTargets.TARGET_DD.value] = (0x604, 0.5) - temp[SWUpdateTargets.TARGET_DD_FPGA.value] = (0x604, 0.5) - - return temp - - @staticmethod - def _convert_data_to_bytes(conversion: str, data: int): - return struct.pack(conversion, data) - def _send_can_message(self, mail_box: int, data: bytes, msg_type: str, wait_for_resp_s: float = 0.0): + """ + Publicly accessible method to send an update message to the bootloader + @param target: The target stack (TD, DD) + @param data: The data in bytes + @param update_payload_len: The number of bytes to write + + @return none + """ self._msg_ack_nack_status[msg_type] = CanCommStatus.CAN_COMM_IN_PROGRESS.value packet = can.Message(arbitration_id=mail_box, data=data, is_extended_id=False) self._can_bus.send(packet, 0) # TODO handle can write error - def _get_fw_ack_nack_resp(self, msg_type: str): + def _get_bootloader_ack_nack_resp(self, msg_type: str): + """ + Privately accessible method to get the bootloader ack or nack response + @param msg_type: The type of the message (command, update) + + @return none + """ start_time = time.time() while True: + # Keep running and wait for the CAN bus message response + # TODO do we need to setup a can listener? message = self._can_bus.recv() can_comm_status = CanCommStatus.CAN_COMM_IN_PROGRESS.value if message is None: continue + # If received the message from the corresponding response message and the message ID count is the same as was sent + # then check the CRC of the received message. If passed, check whether the message as acked or not and update the status if self._RESP_CMD_MAIL_BOX == hex(message.arbitration_id) and message.data[self._RESP_MSG_ID_INDEX] == self._msg_id_count: calc_crc = self._get_crc32(message.data[:self._RESP_CRC_START_INDEX]) payload_crc = struct.unpack(' 0: + left = (crc << cls._SHIFT_8_BITS_FOR_BYTE_SHIFT) & 0xFFFFFFFF + right = (crc >> cls._SHIFT_24_BITS) & 0xFFFFFFFF + crc = left ^ cls._CRC32_TABLE[data[i] ^ right] + i += 1 + length -= 1 + + return crc + + @staticmethod + def _prepare_update_mail_boxes(): + + temp = dict() + temp[SWUpdateTargets.TARGET_TD.value] = (0x603, 0.5) + temp[SWUpdateTargets.TARGET_TD_FPGA.value] = (0x603, 0.5) # TODO add more timeout for FPGAs + temp[SWUpdateTargets.TARGET_DD.value] = (0x604, 0.5) + temp[SWUpdateTargets.TARGET_DD_FPGA.value] = (0x604, 0.5) + + return temp + + @staticmethod + def _convert_data_to_bytes(conversion: str, data: int): + """ + Class method to convert data into bytes + + @param conversion: The conversion of type (e.g.