########################################################################### # # Copyright (c) 2022-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file sw_configs.py # # @author (last) Dara Navaei # @date (last) 03-Nov-2022 # @author (original) Dara Navaei # @date (original) 01-Mar-2022 # ############################################################################ import struct import time from collections import OrderedDict from enum import unique from logging import Logger from time import sleep from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, DialinEnum, publish from ..utils.nv_ops_utils import NVOpsUtils, NVUtilsObserver @unique class HDSWConfigs(DialinEnum): # NOTE: NUM_OF enum has been removed because it should be a part of the software configuration # structure since the members of this class is for looped to create the dictionary automatically SW_CONFIG_DISABLE_ALARM_AUDIO = 0 SW_CONFIG_DISABLE_AIR_TRAP_LEVELING_ALARM = 1 SW_CONFIG_DISABLE_ACK_ERRORS = 2 SW_CONFIG_ENABLE_WORN_OUT_CARTRIDGE = 3 SW_CONFIG_DISABLE_MOTOR_CURRNT_CHECKS = 4 SW_CONFIG_DISABLE_PUMP_FLOW_CHECKS = 5 SW_CONFIG_DISABLE_PUMP_DIRECTION_CHECKS = 6 SW_CONFIG_DISABLE_PUMP_SPEED_CHECKS = 7 SW_CONFIG_DISABLE_SYRINGE_PUMP = 8 SW_CONFIG_ENABLE_SYRINGE_PUMP_CMDS = 9 SW_CONFIG_DISABLE_SYRINGE_PUMP_ALARMS = 10 SW_CONFIG_DISABLE_PRESSURE_CHECKS = 11 SW_CONFIG_DISABLE_ARTERIAL_PRESSURE_CHECK = 12 SW_CONFIG_DISABLE_VENOUS_PRESSURE_CHECK = 13 SW_CONFIG_DISABLE_DIALYSATE_TEMP_CHECK = 14 SW_CONFIG_DISABLE_CAL_CHECK = 15 SW_CONFIG_ENABLE_ALARM_VOLUME_DEFAULT_LOW = 16 SW_CONFIG_DISABLE_ILLEGAL_AIR_TRAP_ALARM = 17 SW_CONFIG_DISABLE_SELF_TESTS_AIR_BUBBLE_CHECK = 18 SW_CONFIG_DISABLE_OCCLUSION_SELF_TEST = 19 SW_CONFIG_DISABLE_BLOOD_LEAK_SELF_TEST = 20 SW_CONFIG_DISABLE_BLOOD_LEAK_ALARM = 21 SW_CONFIG_DISABLE_UI_INTERACTION = 22 SW_CONFIG_DISABLE_SAMPLE_WATER = 23 SW_CONFIG_DISABLE_CONSUMABLES_TESTS = 24 SW_CONFIG_DISABLE_DRY_SELF_TESTS = 25 SW_CONFIG_DISABLE_PRIMING = 26 SW_CONFIG_DISABLE_WET_SELF_TEST = 27 SW_CONFIG_ENABLE_WET_SELF_TEST_WIDER_VOLUME_TOL = 28 SW_CONFIG_DISABLE_ULTRAFILTRATION_ALARMS = 29 SW_CONFIG_DISABLE_BUBBLE_ALARMS = 30 SW_CONFIG_DISABLE_ACCELEROMETERS = 31 SW_CONFIG_DISABLE_RESERVOIRS_ALARMS = 32 SW_CONFIG_DISABLE_CARTRIDGE_REMOVAL_STEP = 33 SW_CONFIG_DISABLE_PUMPS_FLOW_LIMITS = 34 SW_CONFIG_DISABLE_UI_COMM_ALARMS = 35 SW_CONFIG_DISABLE_VOLTAGES_ALARMS = 36 SW_CONFIG_ENABLE_1_MIN_TREATMENT = 37 SW_CONFIG_ENABLE_BLOOD_PUMP_OPEN_LOOP = 38 SW_CONFIG_ENABLE_DIALYSATE_INLET_PUMP_OPEN_LOOP = 39 SW_CONFIG_DISABLE_SWITCHES_MONITOR = 40 SW_CONFIG_ENABLE_VBA_SPECIAL_POSITION_C = 41 SW_CONFIG_DISABLE_SERVICE_AND_DISINFECT_CHECK = 42 SW_CONFIG_DISABLE_AIR_PUMP = 43 class HDSoftwareConfigs(AbstractSubSystem): """ @brief Hemodialysis Device (HD) Dialin API sub-class for HD software configurations related commands. """ _DEFAULT_SW_CONFIG_STATUS = 0 _DEFAULT_CRC_VALUE = 0 _RECORD_SPECS_BYTES = 12 # Maximum allowed bytes to be written to RTC RAM _RTC_RAM_MAX_BYTES_TO_WRITE = 64 _PAYLOAD_TRANSFER_DELAY_S = 0.2 _FIRMWARE_STACK_NAME = 'HD' def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self._current_message = 0 self._total_messages = 0 self._received_msg_length = 0 self._sw_config_data = 0 self._is_getting_sw_config_in_progress = False self._raw_sw_config_record = [] self._utilities = NVOpsUtils(logger=self.logger) self.hd_sw_config_record = self._prepare_hd_sw_configs_record() if self.can_interface is not None: channel_id = DenaliChannels.hd_to_dialin_ch_id msg_id = MsgIds.MSG_ID_HD_SEND_SW_CONFIG_RECORD.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_hd_sw_config_sync) def cmd_reset_hd_sw_config_record(self) -> bool: """ Handles resetting HD software configuration record. @return: True if successful, False otherwise """ # Get the default software configuration dictionary self.hd_sw_config_record = self._prepare_hd_sw_configs_record() # Calculate the CRC for reset software configuration record status = self._cmd_set_hd_sw_config_record() return status def _cmd_request_hd_sw_config_record(self) -> int: """ Handles getting HD software config record from firmware. @return: 1 upon success, False otherwise """ if self._is_getting_sw_config_in_progress is not True: self._is_getting_sw_config_in_progress = True # Clear the list for the next call self._raw_sw_config_record.clear() # Run the firmware commands to get the record message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_GET_SW_CONFIG_RECORD.value) self.logger.debug('Getting HD software configuration record') received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Received FW ACK after requesting DG software configuration record.") # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False self.logger.debug("Request cancelled: an existing request is in progress.") return False def _handler_hd_sw_config_sync(self, message): """ Handles published HD software configuration record messages. HD software configuration records are captured for processing and updating the HD software configuration record. @param message: published HD software configuration record data message @return: None """ curr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] total = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] length = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self._current_message = curr self._total_messages = total self._received_msg_length = length # The end of calibration_record record payload is from the start index + 12 bytes for the current message +total # messages + the length of calibration_record. The rest is the CAN messaging CRC that is not needed # to be kept end_of_data_index = MsgFieldPositions.START_POS_FIELD_1 + self._RECORD_SPECS_BYTES + self._received_msg_length # Get the data only and not specs of it (i.e current message number) self._sw_config_data = message['message'][MsgFieldPositions.START_POS_FIELD_1:end_of_data_index] # Continue getting calibration_record records until the all the calibration_record messages are received. # Concatenate the calibration_record records to each other if self._current_message <= self._total_messages: self._raw_sw_config_record += (message['message'][MsgFieldPositions.START_POS_FIELD_1 + self._RECORD_SPECS_BYTES:end_of_data_index]) if self._current_message == self._total_messages: # Done with receiving the messages self._is_getting_sw_config_in_progress = False # If all the messages have been received, call another function to process the raw data self._utilities.process_received_record_from_fw(self.hd_sw_config_record, self._raw_sw_config_record) self._handler_received_complete_hd_sw_config_record() @publish(["hd_sw_config_record"]) def _handler_received_complete_hd_sw_config_record(self): """ Publishes the received software configuration record @return: None """ self.logger.debug("Received a complete HD software configuration record.") def cmd_update_hd_sw_config_record(self, excel_report_path: str): """ Handles preparing the HD software configuration from the provided excel report @param excel_report_path: (str) the directory in which the excel report of the software configuration is located @return: none """ # Pass the software configuration record dictionary to be updated with the excel document status = self._utilities.get_sw_configs_from_excel(self.hd_sw_config_record, excel_report_path, self._utilities.NON_VOLATILE_RECORD_NAME) # The excel document was successfully read initiate a write command if status: self._cmd_set_hd_sw_config_record() else: self.logger.debug('Could not find the software configurations file') def _cmd_set_hd_sw_config_record(self) -> bool: """ Handles updating the HD software configuration record and sends it to FW. @return: True upon success, False otherwise """ record_packets = self._utilities.prepare_record_to_send_to_fw(self.hd_sw_config_record) self.logger.debug('Setting HD sw config record') # Update all the data packets with the last message count since is the number of messages that firmware # should receive for packet in record_packets: # Sleep to let the firmware receive and process the data time.sleep(self._PAYLOAD_TRANSFER_DELAY_S) # Convert the list packet to a bytearray payload = b''.join(packet) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SET_SW_CONFIG_RECORD.value, payload=payload) received_message = self.can_interface.send(message) # If there is no content... if received_message is None: self.logger.debug("Timeout!!!!") return False self.logger.debug("Finished sending HD software configuration record.") return True def _prepare_hd_sw_configs_record(self) -> OrderedDict: """ Handles assembling the sub dictionaries of each group to make a blank HD software configuration record. @return: (OrderedDict) the assembled dg software configuration record """ record = OrderedDict() groups_byte_size = 0 # create a list of the functions of the sub dictionaries functions = [self._prepare_sw_configs_record()] for function in functions: # Update the groups bytes size so far to be used for padding later groups_byte_size += function[1] # Update the calibration record record.update(function[0]) # Build the CRC of the main calibration_record record record_crc = OrderedDict({'crc': [' tuple: """ Handles creating the software configuration record dictionary. @return: software configuration record dictionary and the byte size of this group """ groups_byte_size = 0 name = 'sw_configs' # Create an ordered dictionary sw_configs = OrderedDict({name: {}}) # Loop through the members of the HDSWConfigs enum class for config in HDSWConfigs.__members__: # Insert the enum name into the dictionary with the default software config. Each config is one byte sw_configs[name].update({config: ['