Index: scripts/update_package_script/update_package.py =================================================================== diff -u -r4e5c95d8cea4d29a61c3c49e5e4b2b6f76bcddb1 -radea121e63d4eabb225c26bc1efd6741c59e0df3 --- scripts/update_package_script/update_package.py (.../update_package.py) (revision 4e5c95d8cea4d29a61c3c49e5e4b2b6f76bcddb1) +++ scripts/update_package_script/update_package.py (.../update_package.py) (revision adea121e63d4eabb225c26bc1efd6741c59e0df3) @@ -1,30 +1,51 @@ import os +import math from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus, SWUpdateTargets class SoftwareUpdateScript: + """ + Software update script - _SW_UPDATE_FLASH_BUFFER_SIZE = 256 + The class runs the firmware and FPGA updates + """ + _SW_UPDATE_PAYLOAD_BYTES = 256 _SW_UPDATE_STACK = 'stack_name' _BINARY_FILE_SIZE = 'file_size' _WRITTEN_BYTE_COUNT = 'byte_count' + _WRITE_COUNTER = 'write_counter' def __init__(self): + """ + SoftwareUpdateScript class constructor + """ self._sw_update_status = dict() self._utilities = Utilities() def _reset_variables(self): + """ + Privately accessible method to reset the variables that are used in software update. + @return none + """ self._sw_update_status[self._SW_UPDATE_STACK] = 0 self._sw_update_status[self._BINARY_FILE_SIZE] = 0 self._sw_update_status[self._WRITTEN_BYTE_COUNT] = 0 + self._sw_update_status[self._WRITE_COUNTER] = 0 self._utilities.clear_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) self._utilities.clear_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) def _prepare_for_sw_update(self, stack_to_update: str, file_path: str): + """ + Privately accessible method to prepare for software update. + @param stack_to_update: The stack to update its software (TD, DD) + @param file_path: The file location of the binary file + + @return True if prepare was successful otherwise, False + """ status = False self._reset_variables() # TODO how to read from a manifest file. Right now it is passed directly @@ -33,47 +54,104 @@ self._sw_update_status[self._BINARY_FILE_SIZE] = os.path.getsize(file_path) send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) + # If the communication has not started, send the command start for update if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value: self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_START.value, self._sw_update_status[self._SW_UPDATE_STACK]) - if self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) == CanCommStatus.CAN_COMM_READY.value: status = True + send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) + # Once the CAN communication was ready it means the command went through successfully, and it is ready for the next command. + if send_ack_status == CanCommStatus.CAN_COMM_READY.value: status = True return status + def _update_progress_status(self, binary_len: int): + """ + Privately accessible method to print the status of software update. + + @param binary_len: The length of the line read from a binary file (e.g. 256 bytes) + + @return none + """ + progress_bar_scale = 10 + write_counter = self._sw_update_status[self._WRITE_COUNTER] + binary_file_size_bytes = self._sw_update_status[self._BINARY_FILE_SIZE] + target_stack = self._sw_update_status[self._SW_UPDATE_STACK] + target_stack =SWUpdateTargets(target_stack).name + bytes_written = self._sw_update_status[self._WRITTEN_BYTE_COUNT] + # Get number of payloads needed to update the binary but since this is for display only, it is scaled down 10 times + num_of_payloads_needed = math.ceil(binary_file_size_bytes / (self._SW_UPDATE_PAYLOAD_BYTES * progress_bar_scale)) + # Update the latest number of bytes that were successfully sent to the bootloader + bytes_written += binary_len + # If the accumulation of the bytes written is a division of the scale (10) then update the progress bar + if bytes_written % (self._SW_UPDATE_PAYLOAD_BYTES * progress_bar_scale) == 0: write_counter += 1 + # If the length of the bytes written is less than the default bytes to extract and write, it means we are the end of the + # update of the binary file, then update the progress bar + if binary_len < self._SW_UPDATE_PAYLOAD_BYTES: write_counter += 1 + # Calculate the percentage of the progress + percent = (bytes_written / self._sw_update_status[self._BINARY_FILE_SIZE]) * 100 + # Add more # and subtract from the empty spaces + text = '#' * write_counter + ' ' * (num_of_payloads_needed - write_counter) + # Print the update, end='\r' so the carriage return is returned to the beginning of the current line + # add flush=True so the data is not buffered and is immediately printed + print("Updating: {} {} {:.1f}%".format(target_stack, text, percent), end='\r', flush=True) + self._sw_update_status[self._WRITTEN_BYTE_COUNT] = bytes_written + self._sw_update_status[self._WRITE_COUNTER] = write_counter + # Once all the bytes of a binary are written, print and empty print to bread from the progress print that was overwriting + if bytes_written >= binary_file_size_bytes: print() + def _process_binary_file(self, file_path: str): + """ + Privately accessible method to process a passed binary file. + @param file_path: The address of the binary file passed to the method + + @return none + """ # TODO add a timeout (e.g. 10 minutes that is very long) to make sure we are not stuck + # Open the file as read the binary f = open(file_path, 'rb') - bytes_written = 0 while True: line = None update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) - + # Read through the file, once ack status says the CAN bus is ready read another batch of bytes if update_ack_status == CanCommStatus.CAN_COMM_READY.value: - line = f.read(self._SW_UPDATE_FLASH_BUFFER_SIZE) + # Read the next batch of bytes in the provided number of bytes + line = f.read(self._SW_UPDATE_PAYLOAD_BYTES) target = self._sw_update_status[self._SW_UPDATE_STACK] if line != b'': + # If the read line is not b'', meaning that it was not empty, send the data to the bootloader self._utilities.send_software_update_msg(target, line, len(line)) - bytes_written += len(line) - percent = (bytes_written /self._sw_update_status[self._BINARY_FILE_SIZE]) * 100 - print("Progress {}%".format(percent)) + self._update_progress_status(len(line)) if line == b'': + # Read line was b'' meaning the binary file was read completely, send the verification command to the bootloader self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_VERIFY.value, self._sw_update_status[self._SW_UPDATE_STACK]) print("Leave file") break - if line == b'': print("Leave file 2"); break - + if line == b'': print("Leave file 2"); break # TODO remove thi line + # Close the binary file f.close() def update_software_packages(self, packages_dir: str, stack_to_update: str = None): - # TODO show progress + """ + Publicly accessible method to update the software packages (firmware and FPGA) + @param packages_dir: The address of the binary file + @param stack_to_update: If the user wants to provide an individual stack (e.g. dd) + + @return none + """ + + # Reset firmware + # Broadcast I have an update + # Wait for the bootloader to come up + # Start updating for file in os.listdir(packages_dir): if not file.endswith(".bin"): continue path = os.path.join(packages_dir, file) if self._prepare_for_sw_update(stack_to_update, path): self._process_binary_file(path) +