import os import math import time import threading import subprocess from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus, UpdateStacks, UpdateStacksDestinations class SoftwareUpdateScript: """ Software update script The class runs the firmware and FPGA updates """ _TD_SW_UPDATE_PAYLOAD_BYTES = 256 _DD_SW_UPDATE_PAYLOAD_BYTES = 512 _BROADCAST_MESSAGE_INTERVAL_S = 0.1 _WAIT_FOR_BOOTLOADER_AFTER_RESET_S = 1.5 _SW_UPDATE_STACK = 'stack_name' _SW_UPDATE_DEST = 'stack_target' _BINARY_FILE_SIZE = 'file_size' _WRITTEN_BYTE_COUNT = 'byte_count' _WRITE_COUNTER = 'write_counter' _CUR_UPDATE_ELAPSED_TIME_S = 'elapsed_time_s' _BINARY_LEN_BYTES = 'binary_len_bytes' _STACK_BYTE_SIZE_TO_READ = 'stack_bytes_to_read' _MCS_DATA_LINE_LEN = 43 _MCS_START_CODE_DEC = ':' _MCS_NUM_OF_BYTES_DEC = '10' _MCS_NUM_OF_BYTES_PER_LINE = 16 _MCS_DATA_DATA_TYPE_DEC = '00' _MCS_LINE_CRC_LEN = 2 _MCS_START_CODE_START_INDEX = 0 _MCS_START_CODE_END_INDEX = 1 _MCS_NUM_OF_BYTES_START_INDEX = 1 _MCS_NUM_OF_BYTES_END_INDEX = 3 _MCS_DATA_TYPE_START_INDEX = 7 _MCS_DATA_TYPE_END_INDEX = 9 _MCS_UPDATE_DATA_START_INDEX = 9 _MCS_SUM_MODULO_256 = 256 def __init__(self): """ SoftwareUpdateScript class constructor """ self._sw_update_status = dict() self._thread = None self._thread_event = None self._utilities = Utilities() def _reset_variables(self): """ Privately accessible method to reset the variables that are used in software update. @return none """ self._sw_update_status[self._SW_UPDATE_STACK] = 0 self._sw_update_status[self._SW_UPDATE_DEST] = 0 self._sw_update_status[self._BINARY_FILE_SIZE] = 0 self._sw_update_status[self._WRITTEN_BYTE_COUNT] = 0 self._sw_update_status[self._WRITE_COUNTER] = 0 self._sw_update_status[self._CUR_UPDATE_ELAPSED_TIME_S] = 0 self._sw_update_status[self._BINARY_LEN_BYTES] = 0 self._sw_update_status[self._STACK_BYTE_SIZE_TO_READ] = self._TD_SW_UPDATE_PAYLOAD_BYTES # Default to TD self._utilities.clear_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) self._utilities.clear_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) def _prepare_for_sw_update(self, stack_to_update: str, destination: str, file_path: str): """ Privately accessible method to prepare for software update. @param stack_to_update: The stack to update its software (TD, DD) @param destination: The update destination (Firmware, FPGA) @param file_path: The file location of the binary file @return True if prepare was successful otherwise, False """ status = False self._reset_variables() # Create the CAN listener list from the Leahi messages self._utilities.clear_can_listener() # TODO how to read from a manifest file. Right now it is passed directly # TODO if stack to update and stack target are not provided, then just read through the manifest files if stack_to_update is not None and destination is not None: stack_to_update = stack_to_update.upper() destination = destination.upper() self._sw_update_status[self._SW_UPDATE_STACK] = UpdateStacks(int(stack_to_update)).value self._sw_update_status[self._SW_UPDATE_DEST] = UpdateStacksDestinations(int(destination)).value #if stack_to_update in UpdateStacks.__members__: self._sw_update_status[self._SW_UPDATE_STACK] = UpdateStacks(stack_to_update).value #if destination in UpdateStacksDestinations.__members__: self._sw_update_status[self._SW_UPDATE_DEST] = UpdateStacksDestinations(destination).value self._sw_update_status[self._CUR_UPDATE_ELAPSED_TIME_S] = time.time() # TODo manifest #self._sw_update_status[self._SW_UPDATE_STACK] = stack_to_update #self._sw_update_status[self._SW_UPDATE_TARGET] = stack_target if self._sw_update_status[self._SW_UPDATE_STACK] == UpdateStacks.STACK_DD.value: \ self._sw_update_status[self._STACK_BYTE_SIZE_TO_READ] = self._DD_SW_UPDATE_PAYLOAD_BYTES # If the file is a .bin the entire file has to be written, if it is an .mcs then we should look into only the data portion self._sw_update_status[self._BINARY_FILE_SIZE] = os.path.getsize(file_path) if file_path.endswith(".mcs"): num_of_lines_with_data = 0 # Command: grep -c ":10" Leahi_103488_ctlbrd_hyd_L018_update.mcs # Look for the lines with :10 at the beginning of them and count the number of lines. cmd = ['grep', '-c', '^' + self._MCS_START_CODE_DEC + self._MCS_NUM_OF_BYTES_DEC, file_path] process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() cmd_response = stdout.decode('ascii', errors='ignore').splitlines() # Get the number of lines with the character and multiply it by 16 bytes to get the number of the bytes that # have to be written. This is essential for the progress status. Otherwise, the .mcs file has more data than # the amount of data that is going to be written to the FPGA. if len(cmd_response) > 0: num_of_lines_with_data = int(cmd_response[0]) self._sw_update_status[self._BINARY_FILE_SIZE] = num_of_lines_with_data * self._MCS_NUM_OF_BYTES_PER_LINE send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) # If the communication has not started, send the command start for update if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value: self._utilities.send_command_msg(SWUpdateCommands.UPDATE_CMD_START.value, self._sw_update_status[self._SW_UPDATE_STACK], self._sw_update_status[self._SW_UPDATE_DEST]) send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME) # Once the CAN communication was ready it means the command went through successfully, and it is ready for the next command. if send_ack_status == CanCommStatus.CAN_COMM_READY.value: status = True return status def _update_progress_status(self, clear_print: bool = False): """ Privately accessible method to print the status of software update. @param clear_print: boolean flag to indicate whether to send an empty print to break the continuous progress print line @return none """ if clear_print: return # If clear no print, return progress_bar_scale = 10 binary_len = self._sw_update_status[self._BINARY_LEN_BYTES] write_counter = self._sw_update_status[self._WRITE_COUNTER] binary_file_size_bytes = self._sw_update_status[self._BINARY_FILE_SIZE] target_stack = self._sw_update_status[self._SW_UPDATE_STACK] target_stack = UpdateStacks(target_stack).name destination = self._sw_update_status[self._SW_UPDATE_DEST] destination = UpdateStacksDestinations(destination).name bytes_written = self._sw_update_status[self._WRITTEN_BYTE_COUNT] stack_target_bytes_2_read = self._sw_update_status[self._STACK_BYTE_SIZE_TO_READ] # If the file size is greater than 1 MB, then set the scale to bigger than default so the progress fits in the screen if binary_file_size_bytes >= 1000000: progress_bar_scale = 50 elapsed_time = time.time() - self._sw_update_status[self._CUR_UPDATE_ELAPSED_TIME_S] # Get number of payloads needed to update the binary but since this is for display only, it is scaled down num_of_payloads_needed = math.ceil(binary_file_size_bytes / (stack_target_bytes_2_read * progress_bar_scale)) # Update the latest number of bytes that were successfully sent to the bootloader bytes_written += binary_len # If the accumulation of the bytes written is a division of the scale (for display) then update the progress bar if bytes_written % (stack_target_bytes_2_read * progress_bar_scale) == 0: write_counter += 1 # If the length of the bytes written is less than the default bytes to extract and write, it means we are the end of the # update of the binary file, then update the progress bar if binary_len < stack_target_bytes_2_read: write_counter += 1 # Calculate the percentage of the progress percent = (bytes_written / self._sw_update_status[self._BINARY_FILE_SIZE]) * 100 # Add more # and subtract from the empty spaces text = '#' * write_counter + ' ' * (num_of_payloads_needed - write_counter) # Print the update, end='\r' so the carriage return is returned to the beginning of the current line # add flush=True so the data is not buffered and is immediately printed print("Updating: {} {} {} {:.1f}% Elapsed Time: {:.1f} s".format(target_stack, destination, text, percent, elapsed_time), end='\r', flush=True) self._sw_update_status[self._WRITTEN_BYTE_COUNT] = bytes_written self._sw_update_status[self._WRITE_COUNTER] = write_counter # Once all the bytes of a binary are written, print an empty print to bread from the progress print that was overwriting if bytes_written >= binary_file_size_bytes: print() def _process_binary_update_file(self, file_path: str): """ Privately accessible method to process a passed binary file. @param file_path: The address of the binary file passed to the method @return none """ # Open the file as read the binary f = open(file_path, 'rb') while True: update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) # Read through the file, once ack status says the CAN bus is ready read another batch of bytes if update_ack_status == CanCommStatus.CAN_COMM_READY.value: # Read the next batch of bytes in the provided number of bytes line = f.read(self._sw_update_status[self._STACK_BYTE_SIZE_TO_READ]) target = self._sw_update_status[self._SW_UPDATE_STACK] self._sw_update_status[self._BINARY_LEN_BYTES] = len(line) if line != b'': # If the read line is not b'', meaning that it was not empty, send the data to the bootloader self._utilities.send_software_update_msg(target, line, len(line)) self._update_progress_status() if line == b'': break # No more binary bytes left exit elif update_ack_status == CanCommStatus.CAN_COMM_TIME_OUT.value: print() print("Timeout") # TODO send again? break # Close the binary file f.close() def _process_mcs_update_file(self, file_path: str): """ Privately accessible method to process a passed Memory Configuration File (MCS) file. @param file_path: The address of the mcs file passed to the method @return none """ f = open(file_path, 'r') # Read the .mcs file but not as binary since the there are more values than the bytes to be read (e.g. length, CRC, address) payload_bytes = bytes() for line_number, line in enumerate(f, start=1): summation = 0 line = line.strip() # Read the line: e.g. :10000000FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF00 -> :[Length][Address][Record Type][Data][Checksum] # 1. ":" is the start of the line # 2. length are the next 2 hex characters (e.g. 10 means 16 bytes of data) # 3. address is the next 4 hex characters. It is not used now. # 4. record type is the next 2 hex characters. 00 data record, 01 end of file, 04 extended linear address # 5. data is the next defined bytes in item 2. # 6. checksum is the last 2 hex characters. It is not used now. # The lines below checks the read line to make sure it is a data line if len(line) != self._MCS_DATA_LINE_LEN: continue start_code = line[self._MCS_START_CODE_START_INDEX:self._MCS_START_CODE_END_INDEX] if self._MCS_START_CODE_DEC not in start_code: continue num_of_bytes = line[self._MCS_NUM_OF_BYTES_START_INDEX:self._MCS_NUM_OF_BYTES_END_INDEX] if self._MCS_NUM_OF_BYTES_DEC != num_of_bytes: continue data_type = line[self._MCS_DATA_TYPE_START_INDEX:self._MCS_DATA_TYPE_END_INDEX] if self._MCS_DATA_DATA_TYPE_DEC != data_type: continue # Get the last two characters of the line as CRC line_crc = line[len(line) - self._MCS_LINE_CRC_LEN:] line_crc = line_crc[0] + line_crc[1] line_crc = int(line_crc, 16) data_line_without_crc = line[self._MCS_START_CODE_END_INDEX:len(line) - self._MCS_LINE_CRC_LEN] for c in range(0, len(data_line_without_crc)): # Loop through the line and sum all bytes (length, address, type, and data) if c % 2 == 0: continue int_value = data_line_without_crc[c - 1] + data_line_without_crc[c] int_value = int(int_value, 16) summation += int_value # Get the 8-bit modulo. The CRC is the 2's complement remainder = summation % self._MCS_SUM_MODULO_256 # Calculate the 2's complement and then & with 0xFF to truncate it to a 1-byte value. (e.g. 256 & 0xFF = 0) crc = ((remainder ^ 0xFF) + 1) & 0xFF if crc != line_crc: self._utilities.file_handle.write("Line Num: {}, Line: {}, Line CRC: {}, Calc CRC: {}\r".format(line_number, line, line_crc, crc)) # TODO if the CRC failed, we should fail the update update_data = line[self._MCS_UPDATE_DATA_START_INDEX:len(line) - self._MCS_LINE_CRC_LEN] for b in range(0, len(update_data)): # Loop through the list of the data extracted data every two characters are concatenated to become one byte of data # For example: 00000000300020010000001430014004 -> 00 00 00 00 30 00 20 01 00 00 00 14 30 01 40 04 # So if the index of the value is odd, the value and the value of the previous index are combined and converted to integer # The concatenated integer is the packed into a byte if b % 2 == 0: continue conc_bytes = update_data[b - 1] + update_data[b] conc_int = int(conc_bytes, 16) payload_bytes += self._utilities.convert_data_to_bytes('