Index: scripts/update_package_script/update_package.py =================================================================== diff -u -r914d77b4c5d45bf1b8b7dd460d4d07a16e119471 -r4e199a369c055fdad3866cebe7ef78b81035b4ba --- scripts/update_package_script/update_package.py (.../update_package.py) (revision 914d77b4c5d45bf1b8b7dd460d4d07a16e119471) +++ scripts/update_package_script/update_package.py (.../update_package.py) (revision 4e199a369c055fdad3866cebe7ef78b81035b4ba) @@ -1,11 +1,21 @@ import os import can import re + +from sphinx.util.osutil import getcwd + from scripts.base.base import Base class SoftwareUpdateScript(Base): + _DECODED_BYTES_KEY_NAME = 'decoded_bytes' + _CUR_DATA_INDEX_KEY_NAME = 'cur_data_index' + _CUR_INSERT_DATA_INDEX_KEY_NAME = 'cur_insert_index' + _TOTAL_BYTES_TX_KEY_NAME = 'total_bytes_tx' + _RESIDUAL_BYTES_KEY_NAME = 'residual_bytes' + + _DECODE_LIST_MAX_SIZE_BYTES = 2048 _SHIFT_BITS_BY_8 = 8 _DECODE_COPRIME_1 = 19 _DECODE_COPRIME_2 = 23 @@ -14,30 +24,40 @@ _XML_REPORT_END_TAG = "" _XML_REPORT_HEADER_SIZE = 2 + _NUM_OF_BYTES_PER_CAN_FRAME = 8 + _CAN_INTERFACE = "can0" + def __init__(self): super().__init__() self._signature_start_in_bytes = bytes(self.SIGNATURE_START, 'utf-8') self._signature_end_in_bytes = bytes(self.SIGNATURE_END, 'utf-8') - self._xml_report_status = dict() - #self._update + self._decode_data_status = dict() + self._xml_report_values = dict() - def _clear_variables(self): + def _reset_variables(self): - self._xml_report_status['decode_values_found'] = False - self._xml_report_status['end_tag_found'] = False - self._xml_report_status['decode_found_index'] = 0 - self._xml_report_status['converted_xml'] = '' - self._xml_report_status['decoded_line'] = list() - self._xml_report_status['is_first_line'] = False - self._xml_report_status['name'] = '' - self._xml_report_status['version'] = '' - self._xml_report_status['filetype'] = '' - self._xml_report_status['security'] = '' - self._xml_report_status['version'] = '' - self._xml_report_status['size'] = '' - self._xml_report_status['raw'] = '' + # TODO #define for the dictionary keys + self._decode_data_status['decode_values_found'] = False + self._decode_data_status['end_tag_found'] = False + self._decode_data_status['decode_found_index'] = 0 + self._decode_data_status['converted_xml'] = '' + self._decode_data_status['is_first_line'] = False + self._decode_data_status['num_of_bytes_sent'] = 0 + self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = list() + self._decode_data_status[self._DECODED_BYTES_KEY_NAME] = list() + self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] = 0 + self._decode_data_status[self._CUR_INSERT_DATA_INDEX_KEY_NAME] = 0 + self._decode_data_status[self._TOTAL_BYTES_TX_KEY_NAME] = 0 + self._xml_report_values['name'] = '' + self._xml_report_values['version'] = '' + self._xml_report_values['filetype'] = '' + self._xml_report_values['security'] = '' + self._xml_report_values['version'] = '' + self._xml_report_values['size'] = '' + self._xml_report_values['raw'] = '' + def _verify_signature(self, signature: bytes): # TODO fill up pass @@ -46,56 +66,56 @@ pass def _get_decode_variables(self, line_bytes: bytes): - a = int(line_bytes[0]) - b = int(line_bytes[1]) - code = (a << self._SHIFT_BITS_BY_8) + b - for s in range(0, 0xFF): - convert = ((s % self._DECODE_COPRIME_1) << self._SHIFT_BITS_BY_8) + (s % self._DECODE_COPRIME_2) + if len(self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME]) != 0: + line_bytes = self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] + line_bytes - if code == convert: - self._xml_report_status['decode_found_index'] = s - self._xml_report_status['decode_values_found'] = True - print("Breaking {} {} {} {}".format(code, convert, s, self._xml_report_status['decode_found_index'])) - break + if len(line_bytes) > 2: + a = int(line_bytes[0]) + b = int(line_bytes[1]) + if a is not None and b is not None: + code = (a << self._SHIFT_BITS_BY_8) + b + for s in range(0, 0xFF): + convert = ((s % self._DECODE_COPRIME_1) << self._SHIFT_BITS_BY_8) + (s % self._DECODE_COPRIME_2) + if code == convert: + self._decode_data_status['decode_found_index'] = s + self._decode_data_status['decode_values_found'] = True + print("Breaking {} {} {} {} {}".format(code, convert, s, a, b)) + break + def _decode_line(self, line_bytes: bytes): start_index = 0 - decoded_line_in_characters = '' - if self._xml_report_status['is_first_line'] is False: + if len(self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME]) != 0: + line_bytes = self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] + line_bytes + self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = list() + + if self._decode_data_status['is_first_line'] is False: start_index = self._XML_REPORT_HEADER_SIZE - self._xml_report_status['is_first_line'] = True + self._decode_data_status['is_first_line'] = True for d in range(start_index, len(line_bytes)): - decoded_value_bytes = (line_bytes[d] - self._xml_report_status['decode_found_index']) % self._DECODE_VALUE - self._xml_report_status['decode_found_index'] += self._DECODE_ADD_VALUE - if self._xml_report_status['end_tag_found'] is False: - decoded_line_in_characters += ''.join(chr(decoded_value_bytes)).replace('\n', '') - else: - self._xml_report_status['decoded_line'] += [hex(i) for i in decoded_value_bytes] + if self._decode_data_status['end_tag_found'] is False: + decoded_value_bytes = (line_bytes[d] - self._decode_data_status[ + 'decode_found_index']) % self._DECODE_VALUE + self._decode_data_status['decode_found_index'] += self._DECODE_ADD_VALUE + self._decode_data_status['converted_xml'] += ''.join(chr(decoded_value_bytes)).replace('\n', '') - if self._xml_report_status['end_tag_found'] is False: - self._xml_report_status['converted_xml'] += decoded_line_in_characters + if self._XML_REPORT_END_TAG in self._decode_data_status['converted_xml']: - if self._XML_REPORT_END_TAG in self._xml_report_status['converted_xml']: - converted_xml = self._xml_report_status['converted_xml'] - self._xml_report_status['end_tag_found'] = True - self._xml_report_status['filetype'] = re.search('(.*)', converted_xml).group(1) - self._xml_report_status['size'] = re.search('(.*)', converted_xml).group(1) - # TODO fill up the reset of items if needed + converted_xml = self._decode_data_status['converted_xml'] + self._decode_data_status['end_tag_found'] = True + self._xml_report_values['filetype'] = re.search('(.*)', converted_xml).group(1) + self._xml_report_values['size'] = re.search('(.*)', converted_xml).group(1) + print(self._xml_report_values['filetype'], self._xml_report_values['size'], + len(converted_xml), converted_xml, self._decode_data_status['decode_found_index'], 'H') + else: + self._decode_data_status[self._DECODED_BYTES_KEY_NAME].append(hex(line_bytes[d])) + self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME] += 1 - if len(line_bytes) > len(decoded_line_in_characters): - temp = line_bytes[len(decoded_line_in_characters):] - self._xml_report_status['decoded_line'] += [hex(i) for i in temp] - print(self._xml_report_status['decoded_line']) - - def _send_update_data_to_bootloader(self): - - pass - def _process_read_line(self, line: bytes): if self._signature_start_in_bytes in line: @@ -104,21 +124,24 @@ self._verify_signature(signature) end_of_signature_mark = line.find(self._signature_end_in_bytes) + len(self._signature_end_in_bytes) xml_report_start = line[end_of_signature_mark:] - - if len(line) > end_of_signature_mark and self._xml_report_status['decode_values_found'] is False: + if (len(line) - end_of_signature_mark) > 2 and self._decode_data_status['decode_values_found'] is False: self._get_decode_variables(xml_report_start) + else: + print(xml_report_start, 'D') + self._decode_data_status[self._RESIDUAL_BYTES_KEY_NAME] = xml_report_start - if self._xml_report_status['decode_values_found'] is True: + if self._decode_data_status['decode_values_found'] is True: + print(xml_report_start, len(xml_report_start), 'K') self._decode_line(xml_report_start) else: # TODO fill up pass - - elif self._xml_report_status['end_tag_found'] is False: + else: + self._get_decode_variables(line) if self._decode_data_status['decode_values_found'] is False else None self._decode_line(line) - else: - self._send_update_data_to_bootloader(line) + def _send_update_data_to_bootloader(self): + pass def update_software_packages(self, packages_dir: str, stack_to_update: str = None): # 1. Verify signature @@ -134,8 +157,14 @@ if file.endswith(".bin"): with open(os.path.join(packages_dir, file), 'rb') as f: # Get ready for the next binary file - self._clear_variables() + self._reset_variables() for line in f: self._process_read_line(line) self._send_update_data_to_bootloader() - f.close() \ No newline at end of file + f.close() + + print(self._decode_data_status[self._CUR_DATA_INDEX_KEY_NAME], + self._decode_data_status[self._DECODED_BYTES_KEY_NAME][0], + self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-3], + self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-2], + self._decode_data_status[self._DECODED_BYTES_KEY_NAME][-1])