import os from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus, SWUpdateTargets class SoftwareUpdateScript: _SW_UPDATE_FLASH_BUFFER_SIZE = 256 _SW_UPDATE_STACK = 'stack_name' _BINARY_FILE_SIZE = 'file_size' _WRITTEN_BYTE_COUNT = 'byte_count' def __init__(self): self._sw_update_status = dict() self._utilities = Utilities() def _reset_variables(self): self._sw_update_status[self._SW_UPDATE_STACK] = 0 self._sw_update_status[self._BINARY_FILE_SIZE] = 0 self._sw_update_status[self._WRITTEN_BYTE_COUNT] = 0 self._utilities.clear_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) self._utilities.clear_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) def _prepare_for_sw_update(self, stack_to_update: str, file_path: str): status = False self._reset_variables() # TODO how to read from a manifest file. Right now it is passed directly stack_to_update = 'TARGET_' + stack_to_update.upper() self._sw_update_status[self._SW_UPDATE_STACK] = SWUpdateTargets[stack_to_update].value self._sw_update_status[self._BINARY_FILE_SIZE] = os.path.getsize(file_path) send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value: self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_START.value, self._sw_update_status[self._SW_UPDATE_STACK]) if self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) == CanCommStatus.CAN_COMM_READY.value: status = True return status def _process_binary_file(self, file_path: str): # TODO add a timeout (e.g. 10 minutes that is very long) to make sure we are not stuck f = open(file_path, 'rb') bytes_written = 0 while True: line = None update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) if update_ack_status == CanCommStatus.CAN_COMM_READY.value: line = f.read(self._SW_UPDATE_FLASH_BUFFER_SIZE) target = self._sw_update_status[self._SW_UPDATE_STACK] if line != b'': self._utilities.send_software_update_msg(target, line, len(line)) bytes_written += len(line) percent = (bytes_written /self._sw_update_status[self._BINARY_FILE_SIZE]) * 100 print("Progress {}%".format(percent)) if line == b'': self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_VERIFY.value, self._sw_update_status[self._SW_UPDATE_STACK]) print("Leave file") break if line == b'': print("Leave file 2"); break f.close() def update_software_packages(self, packages_dir: str, stack_to_update: str = None): # TODO show progress for file in os.listdir(packages_dir): if not file.endswith(".bin"): continue path = os.path.join(packages_dir, file) if self._prepare_for_sw_update(stack_to_update, path): self._process_binary_file(path)