########################################################################### # # Copyright (c) 2024-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file institutional_record.py # # @author (last) Dara Navaei # @date (last) 06-May-2024 # @author (original) Dara Navaei # @date (original) 26-Feb-2024 # ############################################################################ import struct import time from collections import OrderedDict from enum import unique from logging import Logger from time import sleep from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum from ..utils.nv_ops_utils import NVOpsUtils, NVUtilsObserver, NVRecordsHD from ..utils.conversions import integer_to_bytearray class HDInstitutionalNVRecords(AbstractSubSystem): """ Hemodialysis Device (HD) Dialin API sub-class for institutional record commands. """ _DEFAULT_MIN_BLOOD_FLOW_MLPM = 100 _DEFAULT_MAX_BLOOD_FLOW_MLPM = 500 _DEFAULT_MIN_DIALYSATE_FLOW_MLPM = 100 _DEFAULT_MAX_DIALYSATE_FLOW_MLPM = 600 _DEFAULT_MIN_TX_DURATION_MIN = 60 _DEFAULT_MAX_TX_DURATION_MIN = 480 _DEFAULT_MIN_STOP_HEP_DISP_BEFORE_TX_END_MIN = 0 _DEFAULT_MAX_STOP_HEP_DISP_BEFORE_TX_END_MIN = 480 _DEFAULT_MIN_SALINE_BOLUS_VOLUME_ML = 100 _DEFAULT_MAX_SALINE_BOLUS_VOLUME_ML = 300 _DEFAULT_MIN_DIALYSATE_TEMPERATURE_C = 35.0 _DEFAULT_MAX_DIALYSATE_TEMPERATURE_C = 37.0 _DEFAULT_MIN_ART_PRESS_LIMIT_WINDOW_MMHG = 120 _DEFAULT_MAX_ART_PRESS_LIMIT_WINDOW_MMHG = 200 _DEFAULT_MIN_VEN_PRESS_LIMIT_WINDOW_MMHG = 100 _DEFAULT_MAX_VEN_PRESS_LIMIT_WINDOW_MMHG = 200 _DEFAULT_MIN_VEN_ASYM_PRESS_LIMIT_WINDOW_MMHG = 20 _DEFAULT_MAX_VEN_ASYM_PRESS_LIMIT_WINDOW_MMHG = 35 _DEFAULT_MIN_UF_VOLUME_L = 0.0 _DEFAULT_MAX_UF_VOLUME_L = 8.0 _DEFAULT_MIN_HEPARIN_DISP_RATE_MLPHR = 0.0 _DEFAULT_MAX_HEPARIN_DISP_RATE_MLPHR = 1.0 _DEFAULT_MIN_HEPARIN_BOLUS_VOLUME_ML = 0.0 _DEFAULT_MAX_HEPARIN_BOLUS_VOLUME_ML = 2.0 _DEFAULT_ENABLE_CHEM_DISINFECT = 0 _DEFAULT_MIN_RO_REJECTION_RATIO_PCT = 90 _DEFAULT_MIN_INLET_WATER_COND_ALARM_LIMIT_USPCM = 200.0 _DEFAULT_MIN_VEN_WIDE_WINDOW_LIMIT_MMHG = 0 _RECORD_SPECS_BYTES = 12 _DEFAULT_TIME_VALUE = 0 _DEFAULT_CRC_VALUE = 0 _FIRMWARE_STACK_NAME = 'HD' # Maximum allowed bytes that are allowed to be written to EEPROM in firmware # The padding size then is calculated to be divisions of 16 _EEPROM_MAX_BYTES_TO_WRITE = 16 # Delay in between each payload transfer _PAYLOAD_TRANSFER_DELAY_S = 0.2 _DIALIN_RECORD_UPDATE_DELAY_S = 0.2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self._current_message = 0 self._total_messages = 0 self._received_msg_length = 0 self._is_getting_institutional_in_progress = False self._write_fw_data_to_excel = True self._institutional_data = 0 self._raw_institutional_record = [] self._utilities = NVOpsUtils(logger=self.logger) self.hd_institutional_record = self._prepare_hd_institutional_record() if self.can_interface is not None: channel_id = DenaliChannels.hd_to_dialin_ch_id msg_id = MsgIds.MSG_ID_HD_SEND_INSTITUTIONAL_RECORD.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_hd_institutional_sync) self.hd_institutional_record_timestamp = 0.0 def cmd_reset_hd_institutional_record(self) -> bool: """ Handles resetting HD institutional record. @return: True if successful, False otherwise """ self.hd_institutional_record = self._prepare_hd_institutional_record() self.hd_institutional_record = self._utilities.reset_fw_system_service_record(self.hd_institutional_record) status = self.cmd_set_hd_institutional_record(self.hd_institutional_record) return status def cmd_request_hd_institutional_record(self) -> int: """ Handles getting HD institutional record from firmware. @return: 1 upon success, False otherwise """ if self._is_getting_institutional_in_progress is not True: self._is_getting_institutional_in_progress = True # Clear the list for the next call self._raw_institutional_record.clear() message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_GET_INSTITUTIONAL_RECORD.value) self.logger.debug('Getting HD institutional record') received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False self.logger.debug("Request cancelled: an existing request is in progress.") return False def cmd_hd_institutional_record_crc_override(self, crc: int) -> bool: """ Handles setting HD institutional CRC override. @param crc: (int) the CRC override value @return: True if successful, False otherwise """ # This command does not have a reset but since the corresponding payload structure in firmware requires a reset # so the payload length is the same when it is received in the firmware. reset_byte_array = integer_to_bytearray(0) crc_value = integer_to_bytearray(crc) hd_record = integer_to_bytearray(NVRecordsHD.NVDATAMGMT_INSTITUTIONAL_RECORD.value) payload = reset_byte_array + crc_value + hd_record message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_NV_RECORD_CRC_OVERRIDE.value, payload=payload) self.logger.debug("Overriding HD institutional record CRC to: " + str(crc)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.error("Timeout!!!!") return False def _handler_hd_institutional_sync(self, message, timestamp=0.0): """ Handles published HD system record messages. HD institutional records are captured for processing and updating the HD institutional record. @param message: published HD institutional record data message @return: None """ curr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] total = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] length = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self._current_message = curr self._total_messages = total self._received_msg_length = length # The end of institutional_record record payload is from the start index + 12 bytes for the current message+total # messages + the length of institutional_record. The rest is the CAN messaging CRC that is not needed # to be kept end_of_data_index = MsgFieldPositions.START_POS_FIELD_1 + self._RECORD_SPECS_BYTES + self._received_msg_length # Get the data only and not specs of it (i.e current message number) self._institutional_data = message['message'][MsgFieldPositions.START_POS_FIELD_1:end_of_data_index] # Continue getting institutional_record records until the all the calibration_record messages are received. # Concatenate the institutional_record records to each other if self._current_message <= self._total_messages: self._raw_institutional_record += (message['message'][MsgFieldPositions.START_POS_FIELD_1 + self._RECORD_SPECS_BYTES:end_of_data_index]) if self._current_message == self._total_messages: # Done with receiving the messages self._is_getting_institutional_in_progress = False # If all the messages have been received, call another function to process the raw data self._utilities.process_received_record_from_fw(self.hd_institutional_record, self._raw_institutional_record) self.hd_institutional_record_timestamp = timestamp self._handler_received_complete_hd_institutional_record() @publish(["hd_institutional_record_timestamp", "hd_institutional_record"]) def _handler_received_complete_hd_institutional_record(self): """ Publishes the received institutional record @return: None """ self.logger.debug("Received a complete hd institutional record.") def cmd_set_hd_institutional_record(self, hd_institutional_record: OrderedDict) -> bool: """ Handles updating the HD institutional and sends it to FW. @param hd_institutional_record: (OrderedDict) the hd institutional record to be sent @return: True upon success, False otherwise """ transfer_status = 1 self.logger.debug('Setting HD institutional record') record_packets = self._utilities.prepare_record_to_send_to_fw(hd_institutional_record) # Update all the data packets with the last message count since is the number of messages that firmware # should receive for packet in record_packets: # Sleep to let the firmware receive and process the data time.sleep(self._PAYLOAD_TRANSFER_DELAY_S) # Convert the list packet to a bytearray payload = b''.join(packet) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SET_INSTITUTIONAL_RECORD.value, payload=payload) received_message = self.can_interface.send(message) # If there is no content... if received_message is None: self.logger.warning("HD ACK not received!") continue elif transfer_status == 0: self.logger.debug("Sending HD institutional record failed") return False transfer_status = received_message['message'][6] if transfer_status == 1: self.logger.debug("HD institutional record was set successfully!") return True else: self.logger.debug("HD institutional record was not set. Please make sure the ranges are Ok.") return False def _prepare_hd_institutional_record(self): """ Handles assembling the sub dictionaries of each group to make the main HD institutional record. @return: (OrderedDict) an assembled hd institutional record """ result = OrderedDict() groups_byte_size = 0 # create a list of the functions of the sub dictionaries functions = [self._prepare_institutional_record()] for function in functions: # Update the groups bytes size so far to be used to padding later groups_byte_size += function[1] # Update the institutional record result.update(function[0]) # Build the CRC of the main institutional_record record record_crc = OrderedDict({'crc': [' bool: """ Handles setting the institutional record data that is in an excel report to the firmware. @param report_address: (str) the address in which its data must be written from excel @return: none """ # Request the DG institutional record and set and observer class to callback when the record is read back self.cmd_request_hd_institutional_record() observer = NVUtilsObserver("hd_institutional_record") # Attach the observer to the list self.attach(observer) while not observer.received: sleep(0.1) self._utilities.write_excel_record_to_fw_record(self.hd_institutional_record, report_address, self._utilities.INSTITUTIONAL_RECORD_TAB_NAME) ret = self.cmd_set_hd_institutional_record(self.hd_institutional_record) return ret def cmd_get_hd_institutional_record(self, report_address: str = None): """ Publicly accessible function to request the HD institutional record and write the record to excel. @param report_address: the address that the report needs to be written to. The default is None so it picks an address and writes the excel report. @return: none """ # Create the excel report self._utilities.prepare_excel_report(self._FIRMWARE_STACK_NAME, self._utilities.INSTITUTIONAL_RECORD_TAB_NAME, report_address, protect_sheet=True) # Create an object of the observer class to observe the dictionary observer = NVUtilsObserver("hd_institutional_record") # Attach the observer to the list self.attach(observer) # Request the latest software configuration record from firmware self.cmd_request_hd_institutional_record() # Wait until data has been received from firmware while not observer.received: sleep(0.1) # Write the updated values from excel to firmware self._utilities.write_fw_record_to_excel(self.hd_institutional_record)