import os import math import time import threading from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus, UpdateStacks, UpdateStacksDestinations class SoftwareUpdateScript: """ Software update script The class runs the firmware and FPGA updates """ _SW_UPDATE_PAYLOAD_BYTES = 256 _BROADCAST_MESSAGE_INTERVAL_S = 0.2 _SW_UPDATE_STACK = 'stack_name' _SW_UPDATE_DEST = 'stack_target' _BINARY_FILE_SIZE = 'file_size' _WRITTEN_BYTE_COUNT = 'byte_count' _WRITE_COUNTER = 'write_counter' def __init__(self): """ SoftwareUpdateScript class constructor """ self._sw_update_status = dict() self._thread = None self._thread_event = None self._utilities = Utilities() def _reset_variables(self): """ Privately accessible method to reset the variables that are used in software update. @return none """ self._sw_update_status[self._SW_UPDATE_STACK] = 0 self._sw_update_status[self._SW_UPDATE_DEST] = 0 self._sw_update_status[self._BINARY_FILE_SIZE] = 0 self._sw_update_status[self._WRITTEN_BYTE_COUNT] = 0 self._sw_update_status[self._WRITE_COUNTER] = 0 self._utilities.clear_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) self._utilities.clear_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) def _prepare_for_sw_update(self, stack_to_update: str, destination: str, file_path: str): """ Privately accessible method to prepare for software update. @param stack_to_update: The stack to update its software (TD, DD) @param destination: The update destination (Firmware, FPGA) @param file_path: The file location of the binary file @return True if prepare was successful otherwise, False """ status = False self._reset_variables() # TODO how to read from a manifest file. Right now it is passed directly # TODO if stack to update and stack target are not provided, then just read through the manifest files if stack_to_update is not None and destination is not None: print("I", stack_to_update, int(stack_to_update)) stack_to_update = stack_to_update.upper() destination = destination.upper() self._sw_update_status[self._SW_UPDATE_STACK] = UpdateStacks(int(stack_to_update)).value self._sw_update_status[self._SW_UPDATE_DEST] = UpdateStacksDestinations(int(destination)).value #if stack_to_update in UpdateStacks.__members__: self._sw_update_status[self._SW_UPDATE_STACK] = UpdateStacks(stack_to_update).value #if destination in UpdateStacksDestinations.__members__: self._sw_update_status[self._SW_UPDATE_DEST] = UpdateStacksDestinations(destination).value # TODo manifest #self._sw_update_status[self._SW_UPDATE_STACK] = stack_to_update #self._sw_update_status[self._SW_UPDATE_TARGET] = stack_target self._sw_update_status[self._BINARY_FILE_SIZE] = os.path.getsize(file_path) send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) # If the communication has not started, send the command start for update if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value: self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_START.value, self._sw_update_status[self._SW_UPDATE_STACK], self._sw_update_status[self._SW_UPDATE_DEST]) send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) # Once the CAN communication was ready it means the command went through successfully, and it is ready for the next command. if send_ack_status == CanCommStatus.CAN_COMM_READY.value: status = True return status def _update_progress_status(self, binary_len: int): """ Privately accessible method to print the status of software update. @param binary_len: The length of the line read from a binary file (e.g. 256 bytes) @return none """ progress_bar_scale = 10 write_counter = self._sw_update_status[self._WRITE_COUNTER] binary_file_size_bytes = self._sw_update_status[self._BINARY_FILE_SIZE] target_stack = self._sw_update_status[self._SW_UPDATE_STACK] target_stack = UpdateStacksDestinations(target_stack).name destination = self._sw_update_status[self._SW_UPDATE_DEST] destination = UpdateStacks(destination).name bytes_written = self._sw_update_status[self._WRITTEN_BYTE_COUNT] # Get number of payloads needed to update the binary but since this is for display only, it is scaled down 10 times num_of_payloads_needed = math.ceil(binary_file_size_bytes / (self._SW_UPDATE_PAYLOAD_BYTES * progress_bar_scale)) # Update the latest number of bytes that were successfully sent to the bootloader bytes_written += binary_len # If the accumulation of the bytes written is a division of the scale (10) then update the progress bar if bytes_written % (self._SW_UPDATE_PAYLOAD_BYTES * progress_bar_scale) == 0: write_counter += 1 # If the length of the bytes written is less than the default bytes to extract and write, it means we are the end of the # update of the binary file, then update the progress bar if binary_len < self._SW_UPDATE_PAYLOAD_BYTES: write_counter += 1 # Calculate the percentage of the progress percent = (bytes_written / self._sw_update_status[self._BINARY_FILE_SIZE]) * 100 # Add more # and subtract from the empty spaces text = '#' * write_counter + ' ' * (num_of_payloads_needed - write_counter) # Print the update, end='\r' so the carriage return is returned to the beginning of the current line # add flush=True so the data is not buffered and is immediately printed print("Updating: {} {} {} {:.1f}%".format(destination, target_stack, text, percent), end='\r', flush=True) self._sw_update_status[self._WRITTEN_BYTE_COUNT] = bytes_written self._sw_update_status[self._WRITE_COUNTER] = write_counter # Once all the bytes of a binary are written, print an empty print to bread from the progress print that was overwriting if bytes_written >= binary_file_size_bytes: print() def _process_binary_file(self, file_path: str): """ Privately accessible method to process a passed binary file. @param file_path: The address of the binary file passed to the method @return none """ # TODO add a timeout (e.g. 10 minutes that is very long) to make sure we are not stuck # Open the file as read the binary f = open(file_path, 'rb') while True: line = None update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) # Read through the file, once ack status says the CAN bus is ready read another batch of bytes if update_ack_status == CanCommStatus.CAN_COMM_READY.value: # Read the next batch of bytes in the provided number of bytes line = f.read(self._SW_UPDATE_PAYLOAD_BYTES) target = self._sw_update_status[self._SW_UPDATE_STACK] if line != b'': # If the read line is not b'', meaning that it was not empty, send the data to the bootloader self._utilities.send_software_update_msg(target, line, len(line)) self._update_progress_status(len(line)) if line == b'': # Read line was b'' meaning the binary file was read completely, send the verification command to the bootloader self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_VERIFY.value, self._sw_update_status[self._SW_UPDATE_STACK], self._sw_update_status[self._SW_UPDATE_DEST]) break elif update_ack_status == CanCommStatus.CAN_COMM_TIME_OUT.value: print("Timeout") # TODO send again? break if line == b'': print("Leave file 2"); break # TODO remove this line # Close the binary file f.close() def _handle_broadcast_message_thread(self, start_thread: bool): if start_thread: self._thread_event = threading.Event() self._thread = threading.Thread(target=self._utilities.send_update_available_broadcast_message, args=(self._thread_event, self._BROADCAST_MESSAGE_INTERVAL_S)) self._thread.start() else: self._thread_event.set() self._thread.join() ######################## PUBLIC METHOD(S) ############################## def update_software_packages(self, packages_dir: str, stack_to_update: str = None, stack_target: str = None): """ Publicly accessible method to update the software packages (firmware and FPGA) @param packages_dir: The address of the binary file @param stack_to_update: If the user wants to provide an individual stack (TD, DD) @param stack_target: If the user wants to provide an individual target (Firmware, FPGA) @return none """ # Reset firmware # Broadcast I have an update # Wait for the bootloader to come up # Start updating # TODO check if the update folder is empty then send the thread self._handle_broadcast_message_thread(start_thread=True) for file in os.listdir(packages_dir): if not file.endswith(".bin") and not file.endswith(".hex"): continue path = os.path.join(packages_dir, file) if self._prepare_for_sw_update(stack_to_update, stack_target, path): self._process_binary_file(path) # Done with update binary files, stop sending the broadcast message self._handle_broadcast_message_thread(start_thread=False)