########################################################################### # # Copyright (c) 2021-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file calibration_record.py # # @author (last) Micahel Garthwaite # @date (last) 07-Mar-2023 # @author (original) Dara Navaei # @date (original) 12-Feb-2021 # ############################################################################ import struct import time from collections import OrderedDict from logging import Logger from time import sleep from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.nv_ops_utils import NVOpsUtils, NVUtilsObserver, NVRecords from ..utils.conversions import integer_to_bytearray class DGCalibrationNVRecord(AbstractSubSystem): """ Dialysate Generator (DG) Dialin API sub-class for calibration commands. """ _RECORD_START_INDEX = 6 _RECORD_SPECS_BYTES = 12 _DEFAULT_HIGH_ORDER_GAIN_VALUE = 0 _DEFAULT_GAIN_VALUE = 1 _DEFAULT_OFFSET_VALUE = 0 _DEFAULT_RATIO_VALUE = 1 _DEFAULT_VOLUME_VALUE = 0 _DEFAULT_ACID_MIX_RATIO = (2.35618 / 100) _DEFAULT_ACID_BOTTLE_VOL_ML = 3430.0 _DEFAULT_ACID_COND_US_PER_CM = 11645.05 _DEFAULT_ACID_BOTTLE_TEMP_C = 23.5 _DEFAULT_BICARB_MIX_RATIO = (4.06812 / 100) _DEFAULT_BICARB_BOTTLE_VOL_ML = 3780.0 _DEFAULT_BICARB_COND_US_PER_CM = 13734.88 _DEFAULT_BICARB_BOTTLE_TEMP_C = 23.5 _DEFAULT_CALIBRATION_VALUE = 1 _DEFAULT_TIME_VALUE = 0 _DEFAULT_CRC_VALUE = 0 _DEFAULT_FLUSH_LINES_VOLUME = 0.01 _DEFAULT_ULTRAFILTER_TAU_C_PER_MIN = -4.565 _DEFAULT_RESERVOIR_TAU_C_PER_MIN = -0.512 _DEFAULT_ULTRAFILTER_VOLUME_ML = 700 # Maximum allowed bytes that are allowed to be written to EEPROM in firmware # The padding size then is calculated to be divisions of 16 _EEPROM_MAX_BYTES_TO_WRITE = 16 # Delay in between each payload transfer _PAYLOAD_TRANSFER_DELAY_S = 0.2 _DIALIN_RECORD_UPDATE_DELAY_S = 0.2 _FIRMWARE_STACK_NAME = 'DG' def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self._current_message = 0 self._total_messages = 0 self._is_getting_cal_in_progress = False self._cal_data = 0 self._raw_cal_record = [] self._utilities = NVOpsUtils(logger=self.logger) # DG Calibration_record main record self.dg_calibration_record = self._prepare_dg_calibration_record() if self.can_interface is not None: channel_id = DenaliChannels.dg_to_dialin_ch_id msg_id = MsgIds.MSG_ID_DG_SEND_CALIBRATION_RECORD.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_dg_calibration_sync) self.dg_calibration_record_timestamp = 0.0 def cmd_reset_dg_calibration_record(self) -> bool: """ Handles resetting DG calibration record. @return: True if successful, False otherwise """ self.dg_calibration_record = self._prepare_dg_calibration_record() self.dg_calibration_record = self._utilities.reset_fw_record(self.dg_calibration_record) status = self.cmd_set_dg_calibration_record(self.dg_calibration_record) return status def cmd_get_dg_calibration_record_report(self, report_destination: str = None): """ Handles getting DG calibration_record record from firmware and writing it to excel. @param report_destination: (str) the destination that the report should be written to @return: none """ # Prepare the excel report self._utilities.prepare_excel_report(self._FIRMWARE_STACK_NAME, self._utilities.CAL_RECORD_TAB_NAME, report_destination, protect_sheet=True) observer = NVUtilsObserver("dg_calibration_record") # Attach the observer to the list self.attach(observer) # Request the DG calibration record and set and observer class to callback when the calibration record is read # back self.cmd_request_dg_calibration_record() while not observer.received: sleep(0.1) # Pass the DG calibration record to the function to write the excel self._utilities.write_fw_record_to_excel(self.dg_calibration_record) def cmd_request_dg_calibration_record(self) -> bool: """ Handles getting DG calibration_record record from firmware. @return: True if successful, False otherwise """ self.logger.debug("Requesting a dg calibration record...") if not self._is_getting_cal_in_progress: self._is_getting_cal_in_progress = True self._raw_cal_record.clear() message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_GET_CALIBRATION_RECORD.value) received_message = self.can_interface.send(message, time_out=5) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] self.logger.debug("Timeout!!!!") self.logger.warning("Request cancelled: an existing request is in progress.") return False def cmd_dg_calibration_record_crc_override(self, crc: int) -> bool: """ Handles setting DG calibration_record CRC override. @param crc: (int) the CRC override value @return: True if successful, False otherwise """ # This command does not have a reset but since the corresponding payload structure in firmware requires a reset # so the payload length is the same when it is received in the firmware. reset_byte_array = integer_to_bytearray(0) crc_value = integer_to_bytearray(crc) dg_record = integer_to_bytearray(NVRecords.NVDATAMGMT_CALIBRATION_RECORD.value) payload = reset_byte_array + crc_value + dg_record message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_NV_RECORD_CRC_OVERRIDE.value, payload=payload) self.logger.debug("Overriding DG calibration record CRC to: " + str(crc)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.error("Timeout!!!!") return False def _handler_dg_calibration_sync(self, message, timestamp = 0.0): """ Handles published DG calibration_record record messages. DG calibration records are captured for processing and updating the DG calibration_record record. @param message: published DG calibration_record record data message @return: None """ self.logger.debug("DG calibration sync handler...") curr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] total = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] length = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self._current_message = curr self._total_messages = total # The end of calibration_record record payload is from the start index + 12 bytes for the current message +total # messages + the length of calibration_record. The rest is the CAN messaging CRC that is not needed # to be kept end_of_data_index = self._RECORD_START_INDEX + self._RECORD_SPECS_BYTES + length # Get the calibration_record data only self._cal_data = message['message'][self._RECORD_START_INDEX:end_of_data_index] # Continue getting calibration_record records until the all the calibration_record messages are received. # Concatenate the calibration_record records to each other. if self._current_message <= self._total_messages: self._raw_cal_record += (message['message'][self._RECORD_START_INDEX + self._RECORD_SPECS_BYTES:end_of_data_index]) if self._current_message == self._total_messages: # Check if the requested read was just for comparing the results before writing to firmware back self._is_getting_cal_in_progress = False # If all the messages have been received, call another function to process the raw data self._utilities.process_received_record_from_fw(self.dg_calibration_record, self._raw_cal_record) self.dg_calibration_record_timestamp = timestamp self._handler_received_complete_dg_calibration_record() @publish(["dg_calibration_record_timestamp","dg_calibration_record"]) def _handler_received_complete_dg_calibration_record(self): """ Publishes the received calibration record @return: None """ self.logger.debug("Received a complete dg calibration record.") def cmd_set_dg_calibration_excel_to_fw(self, report_address: str): """ Handles setting the calibration data that is in an excel report to the firmware. @param report_address: (str) the address in which its data must be written from excel @return: none """ # Request the DG calibration record and set and observer class to callback when the calibration record is read # back self.cmd_request_dg_calibration_record() observer = NVUtilsObserver("dg_calibration_record") # Attach the observer to the list self.attach(observer) while not observer.received: sleep(0.1) self._utilities.write_excel_record_to_fw_record(self.dg_calibration_record, report_address, self._utilities.CAL_RECORD_TAB_NAME) self.cmd_set_dg_calibration_record(self.dg_calibration_record) def cmd_set_dg_calibration_record(self, previous_record: OrderedDict) -> bool: """ Handles updating the DG calibration record with the newest calibration_record data of a hardware and sends it to FW. @param previous_record: (OrderedDict) the dg calibration record to be sent @return: True upon success, False otherwise """ transfer_status = 1 # Pass the new changes as well as the previous calibration record record_packets = self._utilities.prepare_record_to_send_to_fw(previous_record) self.logger.debug('Setting DG calibration started') # Update all the data packets with the last message count since is the number of messages that firmware # should receive for packet in record_packets: # Sleep to let the firmware receive and process the data time.sleep(self._PAYLOAD_TRANSFER_DELAY_S) # Convert the list packet to a bytearray payload = b''.join(packet) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_SET_CALIBRATION_RECORD.value, payload=payload) received_message = self.can_interface.send(message) transfer_status = received_message['message'][6] # If there is no content... if received_message is None: self.logger.debug("Timeout!!!!") return False elif transfer_status == 0: self.logger.debug("Sending DG calibration record failed") return False if transfer_status == 1: self.logger.debug("Finished sending DG calibration record.") return True def _prepare_dg_calibration_record(self): """ Handles assembling the sub dictionaries of each hardware group to make a DG calibration record. @return: (OrderedDict) the assembled record """ result = OrderedDict() groups_byte_size = 0 # Call the other functions to get the dictionaries of each hardware group. All the dictionaries are # ordered dictionaries to maintain the order in which they are inserted. The results are a tuple, the first # element is the dictionary that was built and the second element is the byte size of the dictionary. records_with_sizes = [self._prepare_pressure_sensors_cal_record(), self._prepare_flow_sensors_cal_record(), self._prepare_load_cells_record(), self._prepare_temperature_sensors_record(), self._prepare_conductivity_sensors_record(), self._prepare_pumps_record(), self._prepare_volume_record(), self._prepare_acid_concentrates_record(), self._prepare_bicarb_concentrates_record(), self._prepare_filters_record(), self._prepare_fans_record(), self._prepare_accelerometer_sensor_record(), self._prepare_heating_constants_record()] for record, byte_size in records_with_sizes: # Update the groups bytes size so far to be use to padding later groups_byte_size += byte_size # Update the calibration record result.update(record) # Build the CRC of the main calibration_record record record_crc = OrderedDict({'crc': ['