########################################################################### # # Copyright (c) 2020-2025 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file ui_proxy.py # # @author (last) Micahel Garthwaite # @date (last) 18-Aug-2023 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ import struct from logging import Logger from leahi_dialin.td.modules.constants import ACCEPTED, REJECTED from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions, RequestRejectReasons from leahi_dialin.protocols.CAN import DenaliMessage, DenaliCanMessenger, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish from leahi_dialin.utils.conversions import integer_to_bytearray, unsigned_byte_to_bytearray, float_to_bytearray, \ short_to_bytearray class UIProxy(AbstractSubSystem): """ Treatment Delivery (TD) Dialin API sub-class for UI proxy ( injection ) related commands. """ SW_COMPATIBILITY_REV = 0 def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ DDProxy constructor @param can_interface: the Denali CAN interface object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.td_to_ui_ch_id msg_id = MsgIds.MSG_ID_TD_RESP_TREATMENT_PARAMS_TO_VALIDATE.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_tx_parameters_val_resp_sync) channel_id = DenaliChannels.td_to_ui_ch_id msg_id = MsgIds.MSG_ID_TD_RESP_ULTRAFILTRATION_VOLUME_TO_VALIDATE.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_uf_val_resp_sync) channel_id = DenaliChannels.td_to_ui_ch_id msg_id = MsgIds.MSG_ID_TD_RESP_INITIATE_TREATMENT_WORKFLOW.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_initiate_treatment_resp_sync) channel_id = DenaliChannels.td_to_ui_ch_id msg_id = MsgIds.MSG_ID_TD_UF_PAUSE_RESUME_RESPONSE.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_uf_pause_resp_sync) # MSG_ID_TD_RESP_TREATMENT_PARAMS_TO_VALIDATE self.tx_params_acceptance = 0 self.tx_params_blood_flow_rr = 0 self.tx_params_dialysate_flow_rr = 0 self.tx_params_tx_duration_rr = 0 self.tx_params_saline_bolus_vol_rr = 0 self.tx_params_acid_rr = 0 self.tx_params_bicarb_rr = 0 self.tx_params_dialyzer_type_rr = 0 self.tx_params_bp_meas_interval_rr = 0 self.tx_params_rinseback_flow_rate_rr = 0 self.tx_params_art_pres_limit_window_rr = 0 self.tx_params_ven_pres_limit_window_rr = 0 self.tx_params_ven_pres_limit_asymmetric_rr = 0 self.tx_params_tmp_pres_limit_window_rr = 0 self.tx_params_dialysate_temperature_rr = 0 self.tx_params_uf_volume_rr = 0 self.tx_params_validate_timestamp = 0 # MSG_ID_TD_RESP_ULTRAFILTRATION_VOLUME_TO_VALIDATE self.uf_volume_acceptance = 0 self.uf_volume_rr = 0 self.uf_volume_ml = 0.0 self.uf_volume_timestamp = 0 # MSG_ID_TD_RESP_INITIATE_TREATMENT_WORKFLOW self.initiate_tx_acceptance = 0 self.initiate_tx_rr = 0 self.initiate_tx_timestamp = 0 # MSG_ID_TD_UF_PAUSE_RESUME_RESPONSE self.uf_pause_resume_acceptance = 0 self.uf_pause_resume_rr = 0 self.uf_pause_resume_timestamp = 0 def _handler_tx_parameters_val_resp_sync(self, message: dict, timestamp=0.0) -> None: """ Handles treatment parameters validation response from the TD. Treatment parameters acceptance and rejection are captured. :param message: the published TD treatment parameters validation response message. :param timestamp: timestamp in epoch time of the received message :return: none """ self.tx_params_acceptance = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.tx_params_blood_flow_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.tx_params_dialysate_flow_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.tx_params_tx_duration_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.tx_params_saline_bolus_vol_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.tx_params_acid_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6]))[0] self.tx_params_bicarb_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7]))[0] self.tx_params_dialyzer_type_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8]))[0] self.tx_params_bp_meas_interval_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9]))[0] self.tx_params_rinseback_flow_rate_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10]))[0] self.tx_params_art_pres_limit_window_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_11:MsgFieldPositions.END_POS_FIELD_11]))[0] self.tx_params_ven_pres_limit_window_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_12:MsgFieldPositions.END_POS_FIELD_12]))[0] self.tx_params_ven_pres_limit_asymmetric_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_13:MsgFieldPositions.END_POS_FIELD_13]))[0] self.tx_params_tmp_pres_limit_window_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_14:MsgFieldPositions.END_POS_FIELD_14]))[0] self.tx_params_dialysate_temperature_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_15:MsgFieldPositions.END_POS_FIELD_15]))[0] self.tx_params_uf_volume_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_16:MsgFieldPositions.END_POS_FIELD_16]))[0] self.tx_params_validate_timestamp = timestamp def _handler_uf_val_resp_sync(self, message: dict, timestamp=0.0) -> None: """ Handles ultrafiltration volume validation response from the TD. Ultrafiltration volume acceptance and rejection are captured. :param message: the published TD ultrafiltration volume validation response message. :param timestamp: timestamp in epoch time of the received message :return: none """ self.uf_volume_acceptance = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.uf_volume_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.uf_volume_ml = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.uf_volume_timestamp = timestamp def _handler_initiate_treatment_resp_sync(self, message: dict, timestamp=0.0) -> None: """ Handles inititate treatment response from the TD. Treatment acceptance and rejection are captured. :param message: the published TD inititate treatment response message. :param timestamp: timestamp in epoch time of the received message :return: none """ self.initiate_tx_acceptance = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.initiate_tx_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.initiate_tx_timestamp = timestamp def _handler_uf_pause_resp_sync(self, message: dict, timestamp=0.0) -> None: """ Handles ultrafiltration pause response from the TD. Ultrafiltration pause acceptance and rejection are captured. :param message: the published TD inititate treatment response message. :param timestamp: timestamp in epoch time of the received message :return: none """ self.uf_pause_resume_acceptance = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.uf_pause_resume_rr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.uf_pause_resume_timestamp = timestamp def cmd_send_ui_treatment_param_data(self, blood_flow_rate: int, dialysate_flow_rate: int, treatment_duration: int, saline_bolus_volume: int, heparin_stop_time: int, heparin_type: int, acid_concentrate: int, bicarb_concentrate: int, dialyzer_type: int, bp_interval: int, rinseback_flow_rate: int, rinseback_volume: int, arterial_pressure_limit: int, venous_pressure_limit: int, venous_pressure_asymmetric: int, tmp_pressure_limit: int, dialysate_temp: float, heparin_dispense_rate: float, heparin_bolus_volume: float ) -> None: """ Constructs and sends a UI set treatment parameters message. :param blood_flow_rate: (int) blood flow rate (in mL/min) :param dialysate_flow_rate: (int) dialysate flow rate (in mL/min) :param treatment_duration: (int) treatment duration (in min) :param saline_bolus_volume: (int) saline bolus volume (in mL) :param heparin_stop_time: (int) heparin stop time (in minutes) :param heparin_type: (int) heparin Type :param acid_concentrate: (int) acid concentrate type :param bicarb_concentrate: (int) bicarbonate concentrate type :param dialyzer_type: (int) dialyzer type :param bp_interval: (int) blood pressure measurement interval (in min) :param rinseback_flow_rate: (int) rinse back flow rate (in mL/min) :param rinseback_volume: (int) rinse back volume (in mL) :param arterial_pressure_limit: (int) arterial pressure alarm limit window (in mmHg) :param venous_pressure_limit: (int) venous pressure alarm limit window (in mmHg) :param venous_pressure_asymmetric: (int) venous pressure alarm limit asymmetric (in mmHg) :param tmp_pressure_limit: (int) transmembrane pressure alarm limit window (in mmHg) :param dialysate_temp: (float) dialysate temperature (in deg C) :param heparin_dispense_rate: (float) heparin dispense rate (in mL/hr) :param heparin_bolus_volume: (float) heparin bolus volume (in mL) :return: None """ bld = integer_to_bytearray(blood_flow_rate) dia = integer_to_bytearray(dialysate_flow_rate) dur = integer_to_bytearray(treatment_duration) sal = integer_to_bytearray(saline_bolus_volume) hst = integer_to_bytearray(heparin_stop_time) het = integer_to_bytearray(heparin_type) acc = integer_to_bytearray(acid_concentrate) bic = integer_to_bytearray(bicarb_concentrate) dzr = integer_to_bytearray(dialyzer_type) bpi = integer_to_bytearray(bp_interval) rbf = integer_to_bytearray(rinseback_flow_rate) rbv = integer_to_bytearray(rinseback_volume) apw = integer_to_bytearray(arterial_pressure_limit) vpw = integer_to_bytearray(venous_pressure_limit) vpa = integer_to_bytearray(venous_pressure_asymmetric) tmp = integer_to_bytearray(tmp_pressure_limit) dtp = float_to_bytearray(dialysate_temp) hdr = float_to_bytearray(heparin_dispense_rate) hbv = float_to_bytearray(heparin_bolus_volume) payload = bld + dia + dur + sal + hst + het + acc + bic + dzr + bpi + rbf + rbv + apw + vpw + vpa + tmp + dtp + hdr + hbv message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_td_ch_id, message_id=MsgIds.MSG_ID_UI_TREATMENT_PARAMS_TO_VALIDATE.value, payload=payload) self.logger.debug("Sending treatment parameters to TD.") self.can_interface.send(message, 0) def cmd_send_ui_version_request(self) -> None: """ Constructs and sends a UI version request broadcast. Can be used to retrieve all sub system version responses. @return: none """ major = unsigned_byte_to_bytearray(0) minor = unsigned_byte_to_bytearray(0) micro = unsigned_byte_to_bytearray(0) build = short_to_bytearray(0) compatibility = integer_to_bytearray(self.SW_COMPATIBILITY_REV) payload = major + minor + micro + build + compatibility message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_FW_VERSIONS_REQUEST.value, payload=payload) self.logger.debug("Sending UI Version Request.") self.can_interface.send(message, 0) def cmd_send_ui_initiate_treatment(self, accepted:int = ACCEPTED, rejection_reason:int = RequestRejectReasons.REQUEST_REJECT_REASON_NONE ) -> None: """ Constructs and sends a UI initiate treatment message. :param accepted: (int) the acceptance value :param rejection_reason: (int) the rejection reason value :return: none """ acc = unsigned_byte_to_bytearray(accepted) rjr = unsigned_byte_to_bytearray(rejection_reason) payload = acc + rjr message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_td_ch_id, message_id=MsgIds.MSG_ID_UI_INITIATE_TREATMENT_WORKFLOW.value, payload=payload) self.logger.debug("Sending UI Initiate Treatment Workflow.") self.can_interface.send(message, 0) def cmd_send_ui_confirmed_treatment_parameters (self, accepted: int = ACCEPTED ) -> None: """ Constructs and sends a UI confirmation of treatment parameters message. :param accepted: (int) the acceptance value :return: none """ acc = unsigned_byte_to_bytearray(accepted) payload = acc message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_td_ch_id, message_id=MsgIds.MSG_ID_UI_TREATMENT_PARAMS_CONFIRMED.value, payload=payload) self.logger.debug("Sending UI Confirm Treatment Parameters.") self.can_interface.send(message, 0) def cmd_send_ui_pause_resume_uf(self, accepted:int = ACCEPTED, rejection_reason:int = RequestRejectReasons.REQUEST_REJECT_REASON_NONE ) -> None: """ Constructs and sends a UI pause/resume ultrafiltration message. :param accepted: (int) the acceptance value :param rejection_reason: (int) the rejection reason value :return: none """ acc = unsigned_byte_to_bytearray(accepted) rjr = unsigned_byte_to_bytearray(rejection_reason) payload = acc + rjr message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_td_ch_id, message_id=MsgIds.MSG_ID_UI_UF_PAUSE_RESUME_REQUEST.value, payload=payload) self.logger.debug("Sending UI Pause Resume UF request.") self.can_interface.send(message, 0) def cmd_send_ui_validate_uf_volume (self, uf_volume: float = 0.0 ) -> None: """ Constructs and sends a UI validate ultrafiltration message. :param uf_volume: (float) the uf volume between 0.0 - 8.0 :return: none """ ufv = float_to_bytearray(uf_volume) payload = ufv message = DenaliMessage.build_message(channel_id=DenaliChannels.ui_to_td_ch_id, message_id=MsgIds.MSG_ID_UI_ULTRAFILTRATION_VOLUME_TO_VALIDATE.value, payload=payload) self.logger.debug("Sending UI Ultrafiltration volume to validate.") self.can_interface.send(message, 0)