import os import can import re from scripts.base.base import Base class SoftwareUpdateScript(Base): _SHIFT_BITS_BY_8 = 8 _DECODE_COPRIME_1 = 19 _DECODE_COPRIME_2 = 23 _DECODE_VALUE = 256 _DECODE_ADD_VALUE = 211 _XML_REPORT_END_TAG = "" _XML_REPORT_HEADER_SIZE = 2 def __init__(self): super().__init__() self._signature_start_in_bytes = bytes(self.SIGNATURE_START, 'utf-8') self._signature_end_in_bytes = bytes(self.SIGNATURE_END, 'utf-8') self._xml_report_status = dict() #self._update def _clear_variables(self): self._xml_report_status['decode_values_found'] = False self._xml_report_status['end_tag_found'] = False self._xml_report_status['decode_found_index'] = 0 self._xml_report_status['converted_xml'] = '' self._xml_report_status['decoded_line'] = list() self._xml_report_status['is_first_line'] = False self._xml_report_status['name'] = '' self._xml_report_status['version'] = '' self._xml_report_status['filetype'] = '' self._xml_report_status['security'] = '' self._xml_report_status['version'] = '' self._xml_report_status['size'] = '' self._xml_report_status['raw'] = '' def _verify_signature(self, signature: bytes): # TODO fill up pass def _verify_key(self): pass def _get_decode_variables(self, line_bytes: bytes): a = int(line_bytes[0]) b = int(line_bytes[1]) code = (a << self._SHIFT_BITS_BY_8) + b for s in range(0, 0xFF): convert = ((s % self._DECODE_COPRIME_1) << self._SHIFT_BITS_BY_8) + (s % self._DECODE_COPRIME_2) if code == convert: self._xml_report_status['decode_found_index'] = s self._xml_report_status['decode_values_found'] = True print("Breaking {} {} {} {}".format(code, convert, s, self._xml_report_status['decode_found_index'])) break def _decode_line(self, line_bytes: bytes): start_index = 0 decoded_line_in_characters = '' if self._xml_report_status['is_first_line'] is False: start_index = self._XML_REPORT_HEADER_SIZE self._xml_report_status['is_first_line'] = True for d in range(start_index, len(line_bytes)): decoded_value_bytes = (line_bytes[d] - self._xml_report_status['decode_found_index']) % self._DECODE_VALUE self._xml_report_status['decode_found_index'] += self._DECODE_ADD_VALUE if self._xml_report_status['end_tag_found'] is False: decoded_line_in_characters += ''.join(chr(decoded_value_bytes)).replace('\n', '') else: self._xml_report_status['decoded_line'] += [hex(i) for i in decoded_value_bytes] if self._xml_report_status['end_tag_found'] is False: self._xml_report_status['converted_xml'] += decoded_line_in_characters if self._XML_REPORT_END_TAG in self._xml_report_status['converted_xml']: converted_xml = self._xml_report_status['converted_xml'] self._xml_report_status['end_tag_found'] = True self._xml_report_status['filetype'] = re.search('(.*)', converted_xml).group(1) self._xml_report_status['size'] = re.search('(.*)', converted_xml).group(1) # TODO fill up the reset of items if needed if len(line_bytes) > len(decoded_line_in_characters): temp = line_bytes[len(decoded_line_in_characters):] self._xml_report_status['decoded_line'] += [hex(i) for i in temp] print(self._xml_report_status['decoded_line']) def _send_update_data_to_bootloader(self): pass def _process_read_line(self, line: bytes): if self._signature_start_in_bytes in line: if self._signature_end_in_bytes: signature = line[len(self._signature_start_in_bytes):line.find(self._signature_end_in_bytes)] self._verify_signature(signature) end_of_signature_mark = line.find(self._signature_end_in_bytes) + len(self._signature_end_in_bytes) xml_report_start = line[end_of_signature_mark:] if len(line) > end_of_signature_mark and self._xml_report_status['decode_values_found'] is False: self._get_decode_variables(xml_report_start) if self._xml_report_status['decode_values_found'] is True: self._decode_line(xml_report_start) else: # TODO fill up pass elif self._xml_report_status['end_tag_found'] is False: self._decode_line(line) else: self._send_update_data_to_bootloader(line) def update_software_packages(self, packages_dir: str, stack_to_update: str = None): # 1. Verify signature # 2. Verify key # 3. Read XML file for to find the packages to update # 4. Decrypt the content # 5. Send the data to bootloader # 6. Wait for the bootloader to ack/nack or timeout after 3000 ms # 7. Send the entire file # 8. Show progress for file in os.listdir(packages_dir): if file.endswith(".bin"): with open(os.path.join(packages_dir, file), 'rb') as f: # Get ready for the next binary file self._clear_variables() for line in f: self._process_read_line(line) self._send_update_data_to_bootloader() f.close()