import os import re from scripts.base.base import Base, SWUpdateTargets from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus class SoftwareUpdateScript(Base): _DECODED_BYTES_KEY_NAME = 'decoded_bytes' _CUR_DATA_INDEX_KEY_NAME = 'cur_data_index' _CUR_INSERT_DATA_INDEX_KEY_NAME = 'cur_insert_index' _TOTAL_BYTES_TX_KEY_NAME = 'total_bytes_tx' _RESIDUAL_BYTES_KEY_NAME = 'residual_bytes' _BINARY_SEND_COMPLETE_KEY_NAME = 'is_binary_done' _SIG_MSG_SEND_COMPLETE_KEY_NAME = 'is_sig_msg_done' _XML_REPORT_FILE_TYPE_KEY_NAME = 'filetype' _XML_REPORT_BIN_SIZE_KEY_NAME = 'size' _DECODE_LIST_MAX_SIZE_BYTES = 4096 _SHIFT_BITS_BY_8 = 8 _DECODE_COPRIME_1 = 19 _DECODE_COPRIME_2 = 23 _DECODE_VALUE = 256 _DECODE_ADD_VALUE = 211 _XML_REPORT_END_TAG = "" _XML_REPORT_HEADER_SIZE = 2 def __init__(self): super().__init__() self._signature_start_in_bytes = bytes(self.SIGNATURE_START, 'utf-8') self._signature_end_in_bytes = bytes(self.SIGNATURE_END, 'utf-8') self._decode_data_status = dict() self._xml_report_values = dict() self._utilities = Utilities() def _reset_variables(self): # TODO #define for the dictionary keys self._decode_data_status['decode_values_found'] = False self._decode_data_status['end_tag_found'] = False self._decode_data_status['decode_found_index'] = 0 self._decode_data_status['converted_xml'] = '' self._decode_data_status['is_first_line'] = False self._decode_data_status['num_of_bytes_sent'] = 0 #TODO remove? self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = list() self._decode_data_status[self._DECODED_BYTES_KEY_NAME] = list() self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] = 0 self._decode_data_status[self._CUR_INSERT_DATA_INDEX_KEY_NAME] = 0 self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] = 0 self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] = False self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] = False self._xml_report_values['name'] = '' self._xml_report_values['version'] = '' self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] = '' self._xml_report_values['security'] = '' self._xml_report_values['version'] = '' self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME] = 0 self._xml_report_values['raw'] = '' self._utilities.clear_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) self._utilities.clear_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) def _verify_signature(self, signature: bytes): # TODO fill up pass def _verify_key(self): pass def _get_decode_variables(self, line_bytes: bytes): if len(self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME]) != 0: line_bytes = self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] + line_bytes if len(line_bytes) > 2: a = int(line_bytes[0]) b = int(line_bytes[1]) if a is not None and b is not None: code = (a << self._SHIFT_BITS_BY_8) + b for s in range(0, 0xFF): convert = ((s % self._DECODE_COPRIME_1) << self._SHIFT_BITS_BY_8) + (s % self._DECODE_COPRIME_2) if code == convert: self._decode_data_status['decode_found_index'] = s self._decode_data_status['decode_values_found'] = True print("Breaking {} {} {} {} {}".format(code, convert, s, a, b)) break def _decode_line(self, line_bytes: bytes): start_index = 0 if len(self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME]) != 0: line_bytes = self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] + line_bytes self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = list() if self._decode_data_status['is_first_line'] is False: start_index = self._XML_REPORT_HEADER_SIZE self._decode_data_status['is_first_line'] = True for d in range(start_index, len(line_bytes)): if self._decode_data_status['end_tag_found'] is False: decoded_value_bytes = (line_bytes[d] - self._decode_data_status[ 'decode_found_index']) % self._DECODE_VALUE self._decode_data_status['decode_found_index'] += self._DECODE_ADD_VALUE self._decode_data_status['converted_xml'] += ''.join(chr(decoded_value_bytes)).replace('\n', '') if self._XML_REPORT_END_TAG in self._decode_data_status['converted_xml']: converted_xml = self._decode_data_status['converted_xml'] self._decode_data_status['end_tag_found'] = True self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] = \ re.search('(.*)', converted_xml).group(1) binary_size_int = re.search('(.*)', converted_xml).group(1) self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME] = int(binary_size_int) print(self._xml_report_values['filetype'], self._xml_report_values['size'], len(converted_xml), converted_xml, self._decode_data_status['decode_found_index'], 'H') # TODO remove else: self._decode_data_status[self._DECODED_BYTES_KEY_NAME].append(line_bytes[d]) self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] += 1 def _process_read_line(self, line: bytes): if self._signature_start_in_bytes in line: if self._signature_end_in_bytes: signature = line[len(self._signature_start_in_bytes):line.find(self._signature_end_in_bytes)] self._verify_signature(signature) end_of_signature_mark = line.find(self._signature_end_in_bytes) + len(self._signature_end_in_bytes) xml_report_start = line[end_of_signature_mark:] if (len(line) - end_of_signature_mark) > 2 and self._decode_data_status['decode_values_found'] is False: self._get_decode_variables(xml_report_start) else: print(xml_report_start, 'D') self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = xml_report_start if self._decode_data_status['decode_values_found'] is True: self._decode_line(xml_report_start) else: # TODO fill up pass else: self._get_decode_variables(line) if self._decode_data_status['decode_values_found'] is False else None self._decode_line(line) def _handle_processed_data(self): send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) #print(send_ack_status, update_ack_status) # TODO change the target right now they do not match because they are written for Leahi # TODO for instance, it should be self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] as the second parameter target = 'TARGET_TD_FPGA' #'TARGET_TD' # self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] target = SWUpdateTargets[target].value #print(self._decode_data_status[self._DECODED_BYTES_KEY_NAME]) if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value: if self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] != '': self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_START.value, target) # TODO change the target right now they do not match because they are written for Leahi # CanCommStatus.CAN_COMM_SUCCESSFUL.value: # TODO this should be changed to successful but the ack is not received now yet elif send_ack_status == 2 and self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is False: if update_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value or \ update_ack_status == 2: #CanCommStatus.CAN_COMM_SUCCESSFUL.value): current_write_index = self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] current_insert_index = self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] binary_size = self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME] var = '' data_2_write = '' if current_write_index == binary_size: self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] = True if current_insert_index - current_write_index >= self.SW_UPDATE_FLASH_BUFFER_SIZE: data_2_write = self._decode_data_status[self._DECODED_BYTES_KEY_NAME] \ [current_write_index: current_write_index + self.SW_UPDATE_FLASH_BUFFER_SIZE] self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] += self.SW_UPDATE_FLASH_BUFFER_SIZE self._utilities.send_software_update_msg(target, data_2_write) var = 'A' elif current_insert_index == binary_size and \ self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is False: remaining_bytes = current_insert_index - current_write_index data_2_write = self._decode_data_status[self._DECODED_BYTES_KEY_NAME] \ [current_write_index: current_insert_index] data_2_write += [0] * (self.SW_UPDATE_FLASH_BUFFER_SIZE - remaining_bytes) self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] += remaining_bytes self._utilities.send_software_update_msg(target, data_2_write) var = 'B' print(current_write_index, self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME], current_insert_index - current_write_index, var, len(data_2_write)) elif update_ack_status == 2 and self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is True: #CanCommStatus.CAN_COMM_SUCCESSFUL.value: # TODO this should be changed to successful but the ack is not received now yet # TODO do we need the electronic signature from xml? data_2_write = [0] * self.SW_UPDATE_FLASH_BUFFER_SIZE self._utilities.send_software_update_msg(target, data_2_write, signature_msg=True) if self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) == 2: #TODO change to successful self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] = True #print(data_2_write, len(data_2_write), self._decode_data_status[self._DECODED_BYTES_KEY_NAME]) def _process_remaining_data(self): while True: self._handle_processed_data() if self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] is True: break # TODO change the target right now they do not match because they are written for Leahi # TODO for instance, it should be self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] as the second parameter target = 'TARGET_TD_FPGA' #'TARGET_TD' # self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] target = SWUpdateTargets[target].value self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_VERIFY.value, target) # TODO change the target right now they do not match because they are written for Leahi def update_software_packages(self, packages_dir: str, stack_to_update: str = None): # 1. Verify signature # 2. Verify key # 3. Read XML file for to find the packages to update # 4. Decrypt the content # 5. Send the data to bootloader # 6. Wait for the bootloader to ack/nack or timeout after 3000 ms # 7. Send the entire file # 8. Show progress for file in os.listdir(packages_dir): if file.endswith(".bin"): with open(os.path.join(packages_dir, file), 'rb') as f: # Get ready for the next binary file self._reset_variables() for line in f: self._process_read_line(line) self._handle_processed_data() self._process_remaining_data() f.close() # TODO for testing remove print(self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME], self._decode_data_status[self._DECODED_BYTES_KEY_NAME][0], self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-4], self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-3], self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-2], self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-1], len(self._decode_data_status[self._DECODED_BYTES_KEY_NAME])) # TODO remove