import os import math from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus, SWUpdateTargets class SoftwareUpdateScript: """ Software update script The class runs the firmware and FPGA updates """ _SW_UPDATE_PAYLOAD_BYTES = 256 _SW_UPDATE_STACK = 'stack_name' _BINARY_FILE_SIZE = 'file_size' _WRITTEN_BYTE_COUNT = 'byte_count' _WRITE_COUNTER = 'write_counter' def __init__(self): """ SoftwareUpdateScript class constructor """ self._sw_update_status = dict() self._utilities = Utilities() def _reset_variables(self): """ Privately accessible method to reset the variables that are used in software update. @return none """ self._sw_update_status[self._SW_UPDATE_STACK] = 0 self._sw_update_status[self._BINARY_FILE_SIZE] = 0 self._sw_update_status[self._WRITTEN_BYTE_COUNT] = 0 self._sw_update_status[self._WRITE_COUNTER] = 0 self._utilities.clear_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) self._utilities.clear_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) def _prepare_for_sw_update(self, stack_to_update: str, file_path: str): """ Privately accessible method to prepare for software update. @param stack_to_update: The stack to update its software (TD, DD) @param file_path: The file location of the binary file @return True if prepare was successful otherwise, False """ status = False self._reset_variables() # TODO how to read from a manifest file. Right now it is passed directly stack_to_update = 'TARGET_' + stack_to_update.upper() self._sw_update_status[self._SW_UPDATE_STACK] = SWUpdateTargets[stack_to_update].value self._sw_update_status[self._BINARY_FILE_SIZE] = os.path.getsize(file_path) send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) # If the communication has not started, send the command start for update if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value: self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_START.value, self._sw_update_status[self._SW_UPDATE_STACK]) send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) # Once the CAN communication was ready it means the command went through successfully, and it is ready for the next command. if send_ack_status == CanCommStatus.CAN_COMM_READY.value: status = True return status def _update_progress_status(self, binary_len: int): """ Privately accessible method to print the status of software update. @param binary_len: The length of the line read from a binary file (e.g. 256 bytes) @return none """ progress_bar_scale = 10 write_counter = self._sw_update_status[self._WRITE_COUNTER] binary_file_size_bytes = self._sw_update_status[self._BINARY_FILE_SIZE] target_stack = self._sw_update_status[self._SW_UPDATE_STACK] target_stack =SWUpdateTargets(target_stack).name bytes_written = self._sw_update_status[self._WRITTEN_BYTE_COUNT] # Get number of payloads needed to update the binary but since this is for display only, it is scaled down 10 times num_of_payloads_needed = math.ceil(binary_file_size_bytes / (self._SW_UPDATE_PAYLOAD_BYTES * progress_bar_scale)) # Update the latest number of bytes that were successfully sent to the bootloader bytes_written += binary_len # If the accumulation of the bytes written is a division of the scale (10) then update the progress bar if bytes_written % (self._SW_UPDATE_PAYLOAD_BYTES * progress_bar_scale) == 0: write_counter += 1 # If the length of the bytes written is less than the default bytes to extract and write, it means we are the end of the # update of the binary file, then update the progress bar if binary_len < self._SW_UPDATE_PAYLOAD_BYTES: write_counter += 1 # Calculate the percentage of the progress percent = (bytes_written / self._sw_update_status[self._BINARY_FILE_SIZE]) * 100 # Add more # and subtract from the empty spaces text = '#' * write_counter + ' ' * (num_of_payloads_needed - write_counter) # Print the update, end='\r' so the carriage return is returned to the beginning of the current line # add flush=True so the data is not buffered and is immediately printed print("Updating: {} {} {:.1f}%".format(target_stack, text, percent), end='\r', flush=True) self._sw_update_status[self._WRITTEN_BYTE_COUNT] = bytes_written self._sw_update_status[self._WRITE_COUNTER] = write_counter # Once all the bytes of a binary are written, print and empty print to bread from the progress print that was overwriting if bytes_written >= binary_file_size_bytes: print() def _process_binary_file(self, file_path: str): """ Privately accessible method to process a passed binary file. @param file_path: The address of the binary file passed to the method @return none """ # TODO add a timeout (e.g. 10 minutes that is very long) to make sure we are not stuck # Open the file as read the binary f = open(file_path, 'rb') while True: line = None update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) # Read through the file, once ack status says the CAN bus is ready read another batch of bytes if update_ack_status == CanCommStatus.CAN_COMM_READY.value: # Read the next batch of bytes in the provided number of bytes line = f.read(self._SW_UPDATE_PAYLOAD_BYTES) target = self._sw_update_status[self._SW_UPDATE_STACK] if line != b'': # If the read line is not b'', meaning that it was not empty, send the data to the bootloader self._utilities.send_software_update_msg(target, line, len(line)) self._update_progress_status(len(line)) if line == b'': # Read line was b'' meaning the binary file was read completely, send the verification command to the bootloader self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_VERIFY.value, self._sw_update_status[self._SW_UPDATE_STACK]) print("Leave file") break if line == b'': print("Leave file 2"); break # TODO remove thi line # Close the binary file f.close() def update_software_packages(self, packages_dir: str, stack_to_update: str = None): """ Publicly accessible method to update the software packages (firmware and FPGA) @param packages_dir: The address of the binary file @param stack_to_update: If the user wants to provide an individual stack (e.g. dd) @return none """ # Reset firmware # Broadcast I have an update # Wait for the bootloader to come up # Start updating for file in os.listdir(packages_dir): if not file.endswith(".bin"): continue path = os.path.join(packages_dir, file) if self._prepare_for_sw_update(stack_to_update, path): self._process_binary_file(path)