########################################################################### # # Copyright (c) 2021-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file pretreatment.py # # @author (last) Dara Navaei # @date (last) 18-Jun-2022 # @author (original) Quang Nguyen # @date (original) 02-Mar-2021 # ############################################################################ import struct from logging import Logger from enum import unique from ..common import (PreTreatmentSubModes, PreTreatmentSampleWaterStates, PreTreatmentConsumableSelfTestStates, PreTreatmentRecircStates, PreTreatmentNoCartSelfTestStates, PreTreatmentDrySelfTestsStates, PreTreatmentPrimeStates) from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum @unique class PreTreatmentRsrvrState(DialinEnum): PRE_TREATMENT_RESERVOIR_MGMT_START_STATE = 0 PRE_TREATMENT_RESERVOIR_MGMT_DRAIN_CMD_STATE = 1 PRE_TREATMENT_RESERVOIR_MGMT_DRAIN_CMD_RESP_STATE = 2 PRE_TREATMENT_RESERVOIR_MGMT_START_FILL_STATE = 3 PRE_TREATMENT_RESERVOIR_MGMT_FILL_CMD_RESP_STATE = 4 PRE_TREATMENT_RESERVOIR_MGMT_FILL_COMPLETE_STATE = 5 PRE_TREATMENT_RESERVOIR_MGMT_WAIT_FOR_RESERVOIR_SWITCH_STATE = 6 PRE_TREATMENT_RESERVOIR_MGMT_COMPLETE_STATE = 7 NUM_OF_PRE_TREATMENT_RESERVOIR_MGMT_STATES = 8 class HDPreTreatment(AbstractSubSystem): """ Hemodialysis Delivery (HD) Dialin API sub-class for pretreatment related commands. """ def __init__(self, can_interface, logger: Logger): """ HDPreTreatment constructor """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(DenaliChannels.hd_sync_broadcast_ch_id, MsgIds.MSG_ID_PRE_TREATMENT_STATE.value, self._handler_pre_treatment_state_sync) msg_id = MsgIds.MSG_ID_HD_NO_CART_SELF_TEST_PROGRESS.value self.can_interface.register_receiving_publication_function(DenaliChannels.hd_sync_broadcast_ch_id, msg_id, self._handler_no_cart_self_test_progress_sync) self.can_interface.register_receiving_publication_function(DenaliChannels.hd_sync_broadcast_ch_id, MsgIds.MSG_ID_HD_DRY_SELF_TEST_PROGRESS.value, self._handler_dry_self_test_progress_sync) self.can_interface.register_receiving_publication_function(DenaliChannels.hd_sync_broadcast_ch_id, MsgIds.MSG_ID_HD_PRIMING_STATUS_DATA.value, self._handler_prime_progress_sync) self.pre_treatment_submode = 0 self.pre_treatment_sample_water_state = 0 self.pre_treatment_consumable_self_test_state = 0 self.pre_treatment_no_cart_self_test_state = 0 self.pre_treatment_installation_state = 0 self.pre_treatment_dry_self_test_state = 0 self.pre_treatment_prime_state = 0 self.pre_treatment_recirc_state = 0 self.pre_treatment_patient_connection_state = 0 self.pre_treatment_wet_self_test_state = 0 self.pre_treatment_reservoir_state = 0 self.no_cart_self_test_timeout = 0 self.no_cart_self_test_time_countdown = 0 self.dry_self_test_timeout = 0 self.dry_self_test_time_countdown = 0 self.prime_timeout = 0 self.prime_time_countdown = 0 def get_pre_treatment_submode(self): """ Gets the pre-treatment state @return: The pre-treatment state """ return self.pre_treatment_submode def get_pre_treatment_sample_water_state(self): """ Gets the pre-treatment sample water state @return: The pre-treatment sample water state """ return self.pre_treatment_sample_water_state def get_pre_treatment_consumable_self_test_state(self): """ Gets the pre-treatment consumable self-test state @return: The pre-treatment consumable self-test state """ return self.pre_treatment_consumable_self_test_state def get_pre_treatment_no_cart_self_test_state(self): """ Gets the pre-treatment no cartridge self-test state @return: The pre-treatment no cartridge self-test state """ return self.pre_treatment_no_cart_self_test_state def get_pre_treatment_installation_state(self): """ Gets the pre-treatment installation state @return: The pre-treatment installation state """ return self.pre_treatment_installation_state def get_pre_treatment_dry_self_test_state(self): """ Gets the pre-treatment dry self-test state @return: The pre-treatment dry self-test state """ return self.pre_treatment_dry_self_test_state def get_pre_treatment_prime_state(self): """ Gets the pre-treatment primt state @return: The pre-treatment prime state """ return self.pre_treatment_prime_state def get_pre_treatment_recirc_state(self): """ Gets the pre-treatment re-circulate state @return: The pre-treatment re-circulate state """ return self.pre_treatment_recirc_state def get_pre_treatment_patient_connection_state(self): """ Gets the pre-treatment patient connection state @return: The pre-treatment patient connection state """ return self.pre_treatment_patient_connection_state def get_no_cart_self_test_timeout(self): """ Gets the pre-treatment no cartridge self-test timeout @return: The pre-treatment no cartridge self-test timeout """ return self.no_cart_self_test_timeout def get_no_cart_self_test_time_countdown(self): """ Gets the pre-treatment no cartridge self-test time countdown @return: The pre-treatment no cartridge self-test time countdown """ return self.no_cart_self_test_time_countdown def get_dry_self_test_timeout(self): """ Gets the pre-treatment dry self-test timeout @return: The pre-treatment dry self-test timeout """ return self.dry_self_test_timeout def get_dry_self_test_time_countdown(self): """ Gets the pre-treatment dry self-test time countdown @return: The pre-treatment dry self-test time countdown """ return self.dry_self_test_time_countdown def get_prime_timeout(self): """ Gets the pre-treatment prime timeout @return: The pre-treatment prime timeout """ return self.prime_timeout def get_prime_time_countdown(self): """ Gets the pre-treatment prime time countdown @return: The pre-treatment prime time countdown """ return self.prime_time_countdown def get_wet_self_test_state(self): """ Gets the pre-treatment wet self test state @return: The pre-treatment wet self test state """ return self.pre_treatment_wet_self_test_state @publish([ "pre_treatment_submode", "pre_treatment_sample_water_state", "pre_treatment_consumable_self_test_state", "pre_treatment_no_cart_self_test_state", "pre_treatment_installation_state", "pre_treatment_dry_self_test_state", "pre_treatment_prime_state", "pre_treatment_recirc_state", "pre_treatment_patient_connection_state", "pre_treatment_wet_self_test_state", "pre_treatment_reservoir_state" ]) def _handler_pre_treatment_state_sync(self, message): """ Handles published pre-treatment state data messages. @param message: published pre-treatment state data message @return: none """ self.pre_treatment_submode = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.pre_treatment_sample_water_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.pre_treatment_consumable_self_test_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.pre_treatment_no_cart_self_test_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.pre_treatment_installation_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.pre_treatment_dry_self_test_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6]))[0] self.pre_treatment_prime_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7]))[0] self.pre_treatment_recirc_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8]))[0] self.pre_treatment_patient_connection_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9]))[0] self.pre_treatment_wet_self_test_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10]))[0] # Yes, I know we should do the new way, but we we need a task to cover all these in one shot! self.pre_treatment_reservoir_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_11:MsgFieldPositions.END_POS_FIELD_11]))[0] @publish([ "no_cart_self_test_timeout", "no_cart_self_test_time_countdown" ]) def _handler_no_cart_self_test_progress_sync(self, message): """ Handles published no cartridge self-test progress data messages. @param message: published no cartridge self-test progress data message @return: None """ self.no_cart_self_test_timeout = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.no_cart_self_test_time_countdown = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] @publish([ "dry_self_test_timeout", "dry_self_test_time_countdown" ]) def _handler_dry_self_test_progress_sync(self, message): """ Handles published dry self-test progress data messages. @param message: published dry self-test progress data message @return: None """ self.dry_self_test_timeout = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.dry_self_test_time_countdown = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] @publish([ "prime_timeout", "prime_time_countdown" ]) def _handler_prime_progress_sync(self, message): """ Handles published prime progress data messages. @param message: published prime progress data message @return: None """ self.prime_timeout = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.prime_time_countdown = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0]