########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file system_record.py # # @author (last) Dara Navaei # @date (last) 28-Oct-2021 # @author (original) Dara Navaei # @date (original) 10-Feb-2021 # ############################################################################ import struct import time from collections import OrderedDict from enum import unique from logging import Logger from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, DialinEnum, publish from ..utils.nv_ops_utils import NVOpsUtils @unique class MFGLocation(DialinEnum): MFG_LOCATION_FACTORY = 0 class DGSystemNVRecord(AbstractSubSystem): """ Hemodialysis Device (HD) Dialin API sub-class for system record commands. """ _RECORD_SPECS_BYTES = 12 _DEFAULT_MFG_LOCATION = MFGLocation.MFG_LOCATION_FACTORY.value _MAX_PN_BYTES = 10 _MAX_SN_BYTES = 15 _DEFAULT_TIME_VALUE = 0 _DEFAULT_CRC_VALUE = 0 # Maximum allowed bytes that are allowed to be written to EEPROM in firmware # The padding size then is calculated to be divisions of 16 _EEPROM_MAX_BYTES_TO_WRITE = 16 # Delay in between each payload transfer _PAYLOAD_TRANSFER_DELAY_S = 0.2 _DIALIN_RECORD_UPDATE_DELAY_S = 0.2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self.current_message = 0 self.total_messages = 0 self.received_msg_length = 0 self._is_getting_sys_in_progress = False self.sys_data = 0 self._raw_system_record = [] self._utilities = NVOpsUtils(logger=self.logger) # System main record self.dg_system_record = self._prepare_dg_system_record() if self.can_interface is not None: channel_id = DenaliChannels.dg_to_dialin_ch_id msg_id = MsgIds.MSG_ID_DG_SEND_SYSTEM_RECORD.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_dg_system_sync) def cmd_reset_dg_system_record(self) -> bool: """ Handles resetting DG system record. @return: True if successful, False otherwise """ self.dg_system_record = self._prepare_dg_system_record() self.dg_system_record = self._utilities.reset_fw_system_service_record(self.dg_system_record) status = self.cmd_set_dg_system_record(self.dg_system_record) return status def get_dg_system_record(self) -> dict: """ Handles getting DG system record per user's request. NOTE: In order to get the latest system record, use cmd_request_dg_system_record first to fetch the system record from the firmware. @return: DG system record dictionary """ return self.dg_system_record['system_record'] def cmd_request_dg_system_record(self) -> int: """ Handles getting DG system record from firmware. @return: 1 upon success, False otherwise """ if self._is_getting_sys_in_progress is not True: # Receiving the system record is in progress self._is_getting_sys_in_progress = True # Clear the list for the next call self._raw_system_record.clear() message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_GET_SYSTEM_RECORD.value) self.logger.debug('Getting DG system record') received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False self.logger.debug("Request cancelled: an existing request is in progress.") return False def _handler_dg_system_sync(self, message): """ Handles published DG system record messages. HD system records are captured for processing and updating the DG system record. @param message: published DG system record data message @return: None """ curr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] total = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] length = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.current_message = curr self.total_messages = total self.received_msg_length = length # The end of calibration_record record payload is from the start index + 12 bytes for the current message +total # messages + the length of calibration_record. The rest is the CAN messaging CRC that is not needed # to be kept end_of_data_index = MsgFieldPositions.START_POS_FIELD_1 + self._RECORD_SPECS_BYTES + self.received_msg_length # Get the data only and not specs of it (i.e current message number) self.sys_data = message['message'][MsgFieldPositions.START_POS_FIELD_1:end_of_data_index] # Continue getting calibration_record records until the all the calibration_record messages are received. # Concatenate the calibration_record records to each other if self.current_message <= self.total_messages: self._raw_system_record += (message['message'][MsgFieldPositions.START_POS_FIELD_1 + self._RECORD_SPECS_BYTES:end_of_data_index]) if self.current_message == self.total_messages: # Done with receiving the messages self._is_getting_sys_in_progress = False # If all the messages have been received, call another function to process the raw data self._utilities.process_received_record_from_fw(self.dg_system_record, self._raw_system_record) self._handler_received_complete_dg_system_record() @publish(["dg_system_record"]) def _handler_received_complete_dg_system_record(self): """ Publishes the received system record @return: None """ self.logger.debug("Received a complete dg system record.") def cmd_set_dg_system_record(self, dg_system_record: OrderedDict) -> bool: """ Handles updating the DG system and sends it to FW. @param dg_system_record: (OrderedDict) the dg system record to be sent @return: True upon success, False otherwise """ record_packets = self._utilities.prepare_record_to_send_to_fw(dg_system_record) self.logger.debug('Setting DG system record') # Update all the data packets with the last message count since is the number of messages that firmware # should receive for packet in record_packets: # Sleep to let the firmware receive and process the data time.sleep(self._PAYLOAD_TRANSFER_DELAY_S) # Convert the list packet to a bytearray payload = b''.join(packet) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_SET_SYSTEM_RECORD.value, payload=payload) received_message = self.can_interface.send(message) # If there is no content... if received_message is None: self.logger.debug("Timeout!!!!") return False self.logger.debug("Finished sending DG service record.") return True def _prepare_dg_system_record(self): """ Handles assembling the sub dictionaries of each group to make the main DG system record. @return: (OrderedDict) the assembled dg system record """ result = OrderedDict() groups_byte_size = 0 # create a list of the functions of the sub dictionaries functions = [self._prepare_system_record()] for function in functions: # Update the groups bytes size so far to be use to padding later groups_byte_size += function[1] # Update the calibration record result.update(function[0]) # Build the CRC of the main calibration_record record record_crc = OrderedDict({'crc': ['