import os
import re
from enum import Enum, unique
from scripts.update_package_script.utilities import Utilities, SWUpdateCommands, CanCommStatus
# TODO remove the base class
class SWUpdateEnum(Enum):
@classmethod
def has_value(cls, value):
return value in cls._value2member_map_
@unique
class SWUpdateTargets(SWUpdateEnum):
TARGET_TD = 0
TARGET_TD_FPGA = 1
TARGET_DD = 2
TARGET_DD_FPGA = 3
TARGET_RO = 4
TARGET_RO_FPGA = 5
NUM_OF_TARGETS = 6
class SoftwareUpdateScript:
_DECODED_BYTES_KEY_NAME = 'decoded_bytes'
_CUR_DATA_INDEX_KEY_NAME = 'cur_data_index'
_CUR_INSERT_DATA_INDEX_KEY_NAME = 'cur_insert_index'
_TOTAL_BYTES_TX_KEY_NAME = 'total_bytes_tx'
_RESIDUAL_BYTES_KEY_NAME = 'residual_bytes'
_BINARY_SEND_COMPLETE_KEY_NAME = 'is_binary_done'
_SIG_MSG_SEND_COMPLETE_KEY_NAME = 'is_sig_msg_done'
_XML_REPORT_FILE_TYPE_KEY_NAME = 'filetype'
_XML_REPORT_BIN_SIZE_KEY_NAME = 'size'
_DECODE_LIST_MAX_SIZE_BYTES = 4096
_SHIFT_BITS_BY_8 = 8
_DECODE_COPRIME_1 = 19
_DECODE_COPRIME_2 = 23
_DECODE_VALUE = 256
_DECODE_ADD_VALUE = 211
_XML_REPORT_END_TAG = ""
_XML_REPORT_HEADER_SIZE = 2
SW_UPDATE_FLASH_BUFFER_SIZE = 256
_SW_UPDATE_STACK = 'stack_name'
_BINARY_FILE_SIZE = 'file_size'
_WRITTEN_BYTE_COUNT = 'byte_count'
def __init__(self):
self._decode_data_status = dict() # TODO remove
self._xml_report_values = dict() # tODO remove
self._sw_update_status = dict()
self._utilities = Utilities()
def _reset_variables(self):
# TODO #define for the dictionary keys
self._decode_data_status['decode_values_found'] = False
self._decode_data_status['end_tag_found'] = False
self._decode_data_status['decode_found_index'] = 0
self._decode_data_status['converted_xml'] = ''
self._decode_data_status['is_first_line'] = False
self._decode_data_status['num_of_bytes_sent'] = 0 #TODO remove?
self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = list()
self._decode_data_status[self._DECODED_BYTES_KEY_NAME] = list()
self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] = 0
self._decode_data_status[self._CUR_INSERT_DATA_INDEX_KEY_NAME] = 0
self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] = 0
self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] = False
self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] = False
self._xml_report_values['name'] = ''
self._xml_report_values['version'] = ''
self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] = ''
self._xml_report_values['security'] = ''
self._xml_report_values['version'] = ''
self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME] = 0
self._xml_report_values['raw'] = ''
self._sw_update_status[self._SW_UPDATE_STACK] = 0
self._sw_update_status[self._BINARY_FILE_SIZE] = 0
self._sw_update_status[self._WRITTEN_BYTE_COUNT] = 0
self._utilities.clear_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME)
self._utilities.clear_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME)
def _prepare_for_sw_update(self, stack_to_update: str, file_path: str):
status = True
self._reset_variables()
# TODO how to read from a manifest file. Right now it is passed directly
stack_to_update = 'TARGET_' + stack_to_update.upper()
self._sw_update_status[self._SW_UPDATE_STACK] = SWUpdateTargets[stack_to_update].value
self._sw_update_status[self._BINARY_FILE_SIZE] = os.path.getsize(file_path)
send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME)
if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value:
self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_START.value,
self._sw_update_status[self._SW_UPDATE_STACK])
# TODO create a wait for response back
# TODO check for status
return status
def _get_decode_variables(self, line_bytes: bytes): # TODO remove
if len(self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME]) != 0:
line_bytes = self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] + line_bytes
if len(line_bytes) > 2:
a = int(line_bytes[0])
b = int(line_bytes[1])
if a is not None and b is not None:
code = (a << self._SHIFT_BITS_BY_8) + b
for s in range(0, 0xFF):
convert = ((s % self._DECODE_COPRIME_1) << self._SHIFT_BITS_BY_8) + (s % self._DECODE_COPRIME_2)
if code == convert:
self._decode_data_status['decode_found_index'] = s
self._decode_data_status['decode_values_found'] = True
print("Breaking {} {} {} {} {}".format(code, convert, s, a, b))
break
def _decode_line(self, line_bytes: bytes): # TODO remove
start_index = 0
if len(self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME]) != 0:
line_bytes = self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] + line_bytes
self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = list()
if self._decode_data_status['is_first_line'] is False:
start_index = self._XML_REPORT_HEADER_SIZE
self._decode_data_status['is_first_line'] = True
for d in range(start_index, len(line_bytes)):
if self._decode_data_status['end_tag_found'] is False:
decoded_value_bytes = (line_bytes[d] - self._decode_data_status[
'decode_found_index']) % self._DECODE_VALUE
self._decode_data_status['decode_found_index'] += self._DECODE_ADD_VALUE
self._decode_data_status['converted_xml'] += ''.join(chr(decoded_value_bytes)).replace('\n', '')
if self._XML_REPORT_END_TAG in self._decode_data_status['converted_xml']:
converted_xml = self._decode_data_status['converted_xml']
self._decode_data_status['end_tag_found'] = True
self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] = \
re.search('(.*)', converted_xml).group(1)
binary_size_int = re.search('(.*)', converted_xml).group(1)
self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME] = int(binary_size_int)
print(self._xml_report_values['filetype'], self._xml_report_values['size'],
len(converted_xml), converted_xml, self._decode_data_status['decode_found_index'], 'H') # TODO remove
else:
self._decode_data_status[self._DECODED_BYTES_KEY_NAME].append(line_bytes[d])
self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] += 1
def _process_read_line_OBSOLETE(self, line: bytes):
if self._signature_start_in_bytes in line:
if self._signature_end_in_bytes:
signature = line[len(self._signature_start_in_bytes):line.find(self._signature_end_in_bytes)]
self._verify_signature(signature)
end_of_signature_mark = line.find(self._signature_end_in_bytes) + len(self._signature_end_in_bytes)
xml_report_start = line[end_of_signature_mark:]
if (len(line) - end_of_signature_mark) > 2 and self._decode_data_status['decode_values_found'] is False:
self._get_decode_variables(xml_report_start)
else:
print(xml_report_start, 'D')
self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = xml_report_start
if self._decode_data_status['decode_values_found'] is True:
self._decode_line(xml_report_start)
else:
# TODO fill up
pass
else:
self._get_decode_variables(line) if self._decode_data_status['decode_values_found'] is False else None
self._decode_line(line)
def _process_binary_file(self, file_path: str):
# TODO add a timeout (e.g. 10 minutes that is very long) to make sure we are not stuck
f = open(file_path, 'rb')
while True:
line = None
update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME)
if update_ack_status == CanCommStatus.CAN_COMM_READY.value:
line = f.read(self.SW_UPDATE_FLASH_BUFFER_SIZE)
target = self._sw_update_status[self._SW_UPDATE_STACK]
self._utilities.send_software_update_msg(target, line, self.SW_UPDATE_FLASH_BUFFER_SIZE)
if line == b'': break
f.close()
def _handle_processed_data(self):
send_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.SEND_MSG_ACK_STATUS_KEY_NAME)
update_ack_status = self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME)
#print(send_ack_status, update_ack_status)
# TODO change the target right now they do not match because they are written for Leahi
# TODO for instance, it should be self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] as the second parameter
target = 'TARGET_TD_FPGA' #'TARGET_TD' # self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME]
target = SWUpdateTargets[target].value
#print(self._decode_data_status[self._DECODED_BYTES_KEY_NAME])
if send_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value:
if self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] != '':
self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_START.value, target)
# TODO change the target right now they do not match because they are written for Leahi
# CanCommStatus.CAN_COMM_SUCCESSFUL.value: # TODO this should be changed to successful but the ack is not received now yet
elif send_ack_status == 2 and self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is False:
if update_ack_status == CanCommStatus.CAN_COMM_NOT_STARTED.value or \
update_ack_status == 2: #CanCommStatus.CAN_COMM_SUCCESSFUL.value):
current_write_index = self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME]
current_insert_index = self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME]
binary_size = self._xml_report_values[self._XML_REPORT_BIN_SIZE_KEY_NAME]
var = ''
data_2_write = ''
if current_write_index == binary_size:
self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] = True
if current_insert_index - current_write_index >= self.SW_UPDATE_FLASH_BUFFER_SIZE:
data_2_write = self._decode_data_status[self._DECODED_BYTES_KEY_NAME] \
[current_write_index: current_write_index + self.SW_UPDATE_FLASH_BUFFER_SIZE]
self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] += self.SW_UPDATE_FLASH_BUFFER_SIZE
self._utilities.send_software_update_msg(target, data_2_write, self.SW_UPDATE_FLASH_BUFFER_SIZE)
var = 'A'
elif current_insert_index == binary_size and \
self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is False:
remaining_bytes = current_insert_index - current_write_index
data_2_write = self._decode_data_status[self._DECODED_BYTES_KEY_NAME] \
[current_write_index: current_insert_index]
data_2_write += [0] * (self.SW_UPDATE_FLASH_BUFFER_SIZE - remaining_bytes)
self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] += remaining_bytes
self._utilities.send_software_update_msg(target, data_2_write, remaining_bytes)
var = 'B'
print(current_write_index, self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME],
current_insert_index - current_write_index, var, len(data_2_write))
elif update_ack_status == 2 and self._decode_data_status[self._BINARY_SEND_COMPLETE_KEY_NAME] is True: #CanCommStatus.CAN_COMM_SUCCESSFUL.value: # TODO this should be changed to successful but the ack is not received now yet
# TODO do we need the electronic signature from xml?
data_2_write = [0] * self.SW_UPDATE_FLASH_BUFFER_SIZE
self._utilities.send_software_update_msg(target, data_2_write, self.SW_UPDATE_FLASH_BUFFER_SIZE, signature_msg=True)
if self._utilities.get_msg_ack_nack_status(self._utilities.UPDATE_MSG_ACK_STATUS_KEY_NAME) == 2: #TODO change to successful
self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] = True
def _process_remaining_data(self): # TODO remvoe
while True:
self._handle_processed_data()
if self._decode_data_status[self._SIG_MSG_SEND_COMPLETE_KEY_NAME] is True:
break
# TODO change the target right now they do not match because they are written for Leahi
# TODO for instance, it should be self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME] as the second parameter
target = 'TARGET_TD_FPGA' #'TARGET_TD' # self._xml_report_values[self._XML_REPORT_FILE_TYPE_KEY_NAME]
target = SWUpdateTargets[target].value
self._utilities.send_command_msg(SWUpdateCommands.SW_UPDATE_VERIFY.value, target)
# TODO change the target right now they do not match because they are written for Leahi
def update_software_packages(self, packages_dir: str, stack_to_update: str = None):
# 1. Verify signature
# 2. Verify key
# 3. Read XML file for to find the packages to update
# 4. Decrypt the content
# 5. Send the data to bootloader
# 6. Wait for the bootloader to ack/nack or timeout after 3000 ms
# 7. Send the entire file
# 8. Show progress
for file in os.listdir(packages_dir):
if not file.endswith(".bin"): continue
path = os.path.join(packages_dir, file)
if self._prepare_for_sw_update(stack_to_update, path):
self._process_binary_file(path)
#self._process_remaining_data()
# TODO for testing remove
#print(self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME],
# self._decode_data_status[self._DECODED_BYTES_KEY_NAME][0],
# self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-4],
# self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-3],
# self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-2],
# self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-1],
# len(self._decode_data_status[self._DECODED_BYTES_KEY_NAME]))
# TODO remove