Index: scripts/update_package_script/update_package.py
===================================================================
diff -u -rfb0e5e63dfae201ddd3c41f76bcedf4d778b723c -r4e5c95d8cea4d29a61c3c49e5e4b2b6f76bcddb1
--- scripts/update_package_script/update_package.py (.../update_package.py) (revision fb0e5e63dfae201ddd3c41f76bcedf4d778b723c)
+++ scripts/update_package_script/update_package.py (.../update_package.py) (revision 4e5c95d8cea4d29a61c3c49e5e4b2b6f76bcddb1)
@@ -1,86 +1,21 @@
+
import os
-import re
+from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus, SWUpdateTargets
-from enum import Enum, unique
-from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus
-
-# TODO remove the base class
-
-class SWUpdateEnum(Enum):
-
- @classmethod
- def has_value(cls, value):
- return value in cls._value2member_map_
-
-@unique
-class SWUpdateTargets(SWUpdateEnum):
- TARGET_TD = 0
- TARGET_TD_FPGA = 1
- TARGET_DD = 2
- TARGET_DD_FPGA = 3
- TARGET_RO = 4
- TARGET_RO_FPGA = 5
- NUM_OF_TARGETS = 6
-
class SoftwareUpdateScript:
- _DECODED_BYTES_KEY_NAME = 'decoded_bytes'
- _CUR_DATA_INDEX_KEY_NAME = 'cur_data_index'
- _CUR_INSERT_DATA_INDEX_KEY_NAME = 'cur_insert_index'
- _TOTAL_BYTES_TX_KEY_NAME = 'total_bytes_tx'
- _RESIDUAL_BYTES_KEY_NAME = 'residual_bytes'
- _BINARY_SEND_COMPLETE_KEY_NAME = 'is_binary_done'
- _SIG_MSG_SEND_COMPLETE_KEY_NAME = 'is_sig_msg_done'
-
- _XML_REPORT_FILE_TYPE_KEY_NAME = 'filetype'
- _XML_REPORT_BIN_SIZE_KEY_NAME = 'size'
-
- _DECODE_LIST_MAX_SIZE_BYTES = 4096
- _SHIFT_BITS_BY_8 = 8
- _DECODE_COPRIME_1 = 19
- _DECODE_COPRIME_2 = 23
- _DECODE_VALUE = 256
- _DECODE_ADD_VALUE = 211
- _XML_REPORT_END_TAG = ""
- _XML_REPORT_HEADER_SIZE = 2
-
- SW_UPDATE_FLASH_BUFFER_SIZE = 256
+ _SW_UPDATE_FLASH_BUFFER_SIZE = 256
_SW_UPDATE_STACK = 'stack_name'
_BINARY_FILE_SIZE = 'file_size'
_WRITTEN_BYTE_COUNT = 'byte_count'
def __init__(self):
- self._decode_data_status = dict() # TODO remove
- self._xml_report_values = dict() # tODO remove
self._sw_update_status = dict()
self._utilities = Utilities()
def _reset_variables(self):
- # TODO #define for the dictionary keys
- self._decode_data_status['decode_values_found'] = False
- self._decode_data_status['end_tag_found'] = False
- self._decode_data_status['decode_found_index'] = 0
- self._decode_data_status['converted_xml'] = ''
- self._decode_data_status['is_first_line'] = False
- self._decode_data_status['num_of_bytes_sent'] = 0 #TODO remove?
- self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = list()
- self._decode_data_status[self._DECODED_BYTES_KEY_NAME] = list()
- self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] = 0
- self._decode_data_status[self._CUR_INSERT_DATA_INDEX_KEY_NAME] = 0
- self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] = 0
- self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] = False
- self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] = False
-
- self._xml_report_values['name'] = ''
- self._xml_report_values['version'] = ''
- self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] = ''
- self._xml_report_values['security'] = ''
- self._xml_report_values['version'] = ''
- self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME] = 0
- self._xml_report_values['raw'] = ''
-
self._sw_update_status[self._SW_UPDATE_STACK] = 0
self._sw_update_status[self._BINARY_FILE_SIZE] = 0
self._sw_update_status[self._WRITTEN_BYTE_COUNT] = 0
@@ -105,203 +40,40 @@
return status
- def _get_decode_variables(self, line_bytes: bytes): # TODO remove
-
- if len(self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME]) != 0:
- line_bytes = self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] + line_bytes
-
- if len(line_bytes) > 2:
- a = int(line_bytes[0])
- b = int(line_bytes[1])
- if a is not None and b is not None:
- code = (a << self._SHIFT_BITS_BY_8) + b
- for s in range(0, 0xFF):
- convert = ((s % self._DECODE_COPRIME_1) << self._SHIFT_BITS_BY_8) + (s % self._DECODE_COPRIME_2)
-
- if code == convert:
- self._decode_data_status['decode_found_index'] = s
- self._decode_data_status['decode_values_found'] = True
- print("Breaking {} {} {} {} {}".format(code, convert, s, a, b))
- break
-
- def _decode_line(self, line_bytes: bytes): # TODO remove
-
- start_index = 0
-
- if len(self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME]) != 0:
- line_bytes = self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] + line_bytes
- self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = list()
-
- if self._decode_data_status['is_first_line'] is False:
- start_index = self._XML_REPORT_HEADER_SIZE
- self._decode_data_status['is_first_line'] = True
-
- for d in range(start_index, len(line_bytes)):
- if self._decode_data_status['end_tag_found'] is False:
- decoded_value_bytes = (line_bytes[d] - self._decode_data_status[
- 'decode_found_index']) % self._DECODE_VALUE
- self._decode_data_status['decode_found_index'] += self._DECODE_ADD_VALUE
- self._decode_data_status['converted_xml'] += ''.join(chr(decoded_value_bytes)).replace('\n', '')
-
- if self._XML_REPORT_END_TAG in self._decode_data_status['converted_xml']:
-
- converted_xml = self._decode_data_status['converted_xml']
- self._decode_data_status['end_tag_found'] = True
- self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] = \
- re.search('(.*)', converted_xml).group(1)
- binary_size_int = re.search('(.*)', converted_xml).group(1)
- self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME] = int(binary_size_int)
-
- print(self._xml_report_values['filetype'], self._xml_report_values['size'],
- len(converted_xml), converted_xml, self._decode_data_status['decode_found_index'], 'H') # TODO remove
- else:
- self._decode_data_status[self._DECODED_BYTES_KEY_NAME].append(line_bytes[d])
- self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] += 1
-
- def _process_read_line_OBSOLETE(self, line: bytes):
-
- if self._signature_start_in_bytes in line:
- if self._signature_end_in_bytes:
- signature = line[len(self._signature_start_in_bytes):line.find(self._signature_end_in_bytes)]
- self._verify_signature(signature)
- end_of_signature_mark = line.find(self._signature_end_in_bytes) + len(self._signature_end_in_bytes)
- xml_report_start = line[end_of_signature_mark:]
- if (len(line) - end_of_signature_mark) > 2 and self._decode_data_status['decode_values_found'] is False:
- self._get_decode_variables(xml_report_start)
- else:
- print(xml_report_start, 'D')
- self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = xml_report_start
-
- if self._decode_data_status['decode_values_found'] is True:
- self._decode_line(xml_report_start)
- else:
- # TODO fill up
- pass
- else:
- self._get_decode_variables(line) if self._decode_data_status['decode_values_found'] is False else None
- self._decode_line(line)
-
def _process_binary_file(self, file_path: str):
# TODO add a timeout (e.g. 10 minutes that is very long) to make sure we are not stuck
f = open(file_path, 'rb')
+ bytes_written = 0
while True:
line = None
update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME)
if update_ack_status == CanCommStatus.CAN_COMM_READY.value:
- line = f.read(self.SW_UPDATE_FLASH_BUFFER_SIZE)
+ line = f.read(self._SW_UPDATE_FLASH_BUFFER_SIZE)
target = self._sw_update_status[self._SW_UPDATE_STACK]
if line != b'':
self._utilities.send_software_update_msg(target, line, len(line))
+ bytes_written += len(line)
+ percent = (bytes_written /self._sw_update_status[self._BINARY_FILE_SIZE]) * 100
+ print("Progress {}%".format(percent))
if line == b'':
self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_VERIFY.value,
self._sw_update_status[self._SW_UPDATE_STACK])
- print("Leave file");
+ print("Leave file")
break
if line == b'': print("Leave file 2"); break
f.close()
-
-
- def _handle_processed_data(self):
-
- send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME)
- update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME)
- #print(send_ack_status, update_ack_status)
-
- # TODO change the target right now they do not match because they are written for Leahi
- # TODO for instance, it should be self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] as the second parameter
- target = 'TARGET_TD_FPGA' #'TARGET_TD' # self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME]
- target = SWUpdateTargets[target].value
- #print(self._decode_data_status[self._DECODED_BYTES_KEY_NAME])
- if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value:
- if self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] != '':
- self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_START.value, target)
- # TODO change the target right now they do not match because they are written for Leahi
- # CanCommStatus.CAN_COMM_SUCCESSFUL.value: # TODO this should be changed to successful but the ack is not received now yet
- elif send_ack_status == 2 and self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is False:
- if update_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value or \
- update_ack_status == 2: #CanCommStatus.CAN_COMM_SUCCESSFUL.value):
- current_write_index = self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME]
- current_insert_index = self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME]
- binary_size = self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME]
- var = ''
- data_2_write = ''
- if current_write_index == binary_size:
- self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] = True
-
- if current_insert_index - current_write_index >= self.SW_UPDATE_FLASH_BUFFER_SIZE:
- data_2_write = self._decode_data_status[self._DECODED_BYTES_KEY_NAME] \
- [current_write_index: current_write_index + self.SW_UPDATE_FLASH_BUFFER_SIZE]
- self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] += self.SW_UPDATE_FLASH_BUFFER_SIZE
- self._utilities.send_software_update_msg(target, data_2_write, self.SW_UPDATE_FLASH_BUFFER_SIZE)
- var = 'A'
- elif current_insert_index == binary_size and \
- self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is False:
- remaining_bytes = current_insert_index - current_write_index
- data_2_write = self._decode_data_status[self._DECODED_BYTES_KEY_NAME] \
- [current_write_index: current_insert_index]
- data_2_write += [0] * (self.SW_UPDATE_FLASH_BUFFER_SIZE - remaining_bytes)
- self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] += remaining_bytes
- self._utilities.send_software_update_msg(target, data_2_write, remaining_bytes)
- var = 'B'
- print(current_write_index, self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME],
- current_insert_index - current_write_index, var, len(data_2_write))
-
- elif update_ack_status == 2 and self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is True: #CanCommStatus.CAN_COMM_SUCCESSFUL.value: # TODO this should be changed to successful but the ack is not received now yet
- # TODO do we need the electronic signature from xml?
- data_2_write = [0] * self.SW_UPDATE_FLASH_BUFFER_SIZE
- self._utilities.send_software_update_msg(target, data_2_write, self.SW_UPDATE_FLASH_BUFFER_SIZE, signature_msg=True)
-
- if self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) == 2: #TODO change to successful
- self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] = True
-
- def _process_remaining_data(self): # TODO remvoe
-
- while True:
- self._handle_processed_data()
-
- if self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] is True:
- break
-
- # TODO change the target right now they do not match because they are written for Leahi
- # TODO for instance, it should be self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] as the second parameter
- target = 'TARGET_TD_FPGA' #'TARGET_TD' # self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME]
- target = SWUpdateTargets[target].value
- self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_VERIFY.value, target)
- # TODO change the target right now they do not match because they are written for Leahi
-
-
def update_software_packages(self, packages_dir: str, stack_to_update: str = None):
- # 1. Verify signature
- # 2. Verify key
- # 3. Read XML file for to find the packages to update
- # 4. Decrypt the content
- # 5. Send the data to bootloader
- # 6. Wait for the bootloader to ack/nack or timeout after 3000 ms
- # 7. Send the entire file
- # 8. Show progress
+ # TODO show progress
for file in os.listdir(packages_dir):
if not file.endswith(".bin"): continue
path = os.path.join(packages_dir, file)
if self._prepare_for_sw_update(stack_to_update, path):
self._process_binary_file(path)
-
- #self._process_remaining_data()
-
-
- # TODO for testing remove
- #print(self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME],
- # self._decode_data_status[self._DECODED_BYTES_KEY_NAME][0],
- # self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-4],
- # self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-3],
- # self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-2],
- # self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-1],
- # len(self._decode_data_status[self._DECODED_BYTES_KEY_NAME]))
- # TODO remove