########################################################################### # # Copyright (c) 2020-2025 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file ui_proxy.py # # @author (last) Zoltan Miskolci # @date (last) 05-May-2026 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ # Module imports from logging import Logger # Project imports from leahi_dialin.common import td_enum_repository from leahi_dialin.common.constants import ACCEPTED from leahi_dialin.common.disp_defs_proxy import disp_enum_repository from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds, RequestRejectReasons from leahi_dialin.common.override_templates import cmd_generic_override from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.conversions import integer_to_bytearray, unsigned_byte_to_bytearray, float_to_bytearray, \ short_to_bytearray class UIProxy(AbstractSubSystem): """ Treatment Delivery (TD) Dialin API sub-class for UI proxy ( injection ) related commands. """ SW_COMPATIBILITY_REV = 0 def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ DDProxy constructor @param can_interface: the Denali CAN interface object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_to_ui_ch_id, message_id = MsgIds.MSG_ID_TD_RESP_TREATMENT_PARAMS_TO_VALIDATE.value, function = self._handler_tx_parameters_val_resp_sync) self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_to_ui_ch_id, message_id = MsgIds.MSG_ID_TD_RESP_ULTRAFILTRATION_VOLUME_TO_VALIDATE.value, function = self._handler_uf_val_resp_sync) self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_to_ui_ch_id, message_id = MsgIds.MSG_ID_TD_RESP_INITIATE_TREATMENT_WORKFLOW.value, function = self._handler_initiate_treatment_resp_sync) self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_to_ui_ch_id, message_id = MsgIds.MSG_ID_TD_UF_PAUSE_RESUME_RESPONSE.value, function = self._handler_uf_pause_resp_sync) # MSG_ID_TD_RESP_TREATMENT_PARAMS_TO_VALIDATE self.tx_params_treatment_modality = td_enum_repository.TDTreatmentModalityTypes.HD.value self.tx_params_hdf_dilution = td_enum_repository.TDTreatmentHDFDilution.HDF_NOT_APPLICABLE.value self.tx_params_blood_flow_rate = 0 self.tx_params_dialysate_flow_rate = 0 self.tx_params_tx_duration = 0 self.tx_params_heparin_duration = 0 self.tx_params_heparin_type = disp_enum_repository.HeparinTypes.HEPARIN_TYPE_FRESENIUS_CENTRISOL.value self.tx_params_dry_bicarb_cart_size = 0 self.tx_params_sodium = 0 self.tx_params_bicarb_concentrate = disp_enum_repository.BicarbTypes.BICARB_08_677753_0.value self.tx_params_dialyzer_type = disp_enum_repository.DialyzerTypes.DIALYZER_TYPE_BBRAUN_PRO_13H.value self.tx_params_fluid_bolus_volume = 0 self.tx_params_bp_interval = 0 self.tx_params_rinseback_volume = 0 self.tx_params_hepatitis_b_status = td_enum_repository.TDTreaHeparinStates.HEPARIN_STATE_OFF.value self.tx_params_acid_concentrate = disp_enum_repository.AcidTypes.ACID_08_1251_1.value self.tx_params_subst_fluid_volume = 0.0 self.tx_params_heparin_bolus_volume = 0.0 self.tx_params_heparin_delivery_rate = 0.0 self.tx_params_dialysate_temperature = 0.0 self.tx_params_uf_pre_weight = 0.0 self.tx_params_uf_est_target_weight = 0.0 self.tx_params_uf_volume = 0.0 self.tx_params_validate_timestamp = 0 # MSG_ID_TD_RESP_ULTRAFILTRATION_VOLUME_TO_VALIDATE self.uf_volume_acceptance = 0 self.uf_volume_rr = 0 self.uf_volume_ml = 0.0 self.uf_volume_timestamp = 0 # MSG_ID_TD_RESP_INITIATE_TREATMENT_WORKFLOW self.initiate_tx_acceptance = 0 self.initiate_tx_rr = 0 self.initiate_tx_timestamp = 0 # MSG_ID_TD_UF_PAUSE_RESUME_RESPONSE self.uf_pause_resume_acceptance = 0 self.uf_pause_resume_rr = 0 self.uf_pause_resume_timestamp = 0 def _handler_tx_parameters_val_resp_sync(self, message: dict, timestamp=0.0) -> None: """ Handles treatment parameters validation response from the TD. Treatment parameters acceptance and rejection are captured. :param message: the published TD treatment parameters validation response message. :param timestamp: timestamp in epoch time of the received message :return: none """ msg_list = [] msg_list.append(('self.tx_params_treatment_modality', DataTypes.U32)) msg_list.append(('self.tx_params_hdf_dilution', DataTypes.U32)) msg_list.append(('self.tx_params_blood_flow_rate', DataTypes.U32)) msg_list.append(('self.tx_params_dialysate_flow_rate', DataTypes.U32)) msg_list.append(('self.tx_params_tx_duration', DataTypes.U32)) msg_list.append(('self.tx_params_heparin_duration', DataTypes.U32)) msg_list.append(('self.tx_params_heparin_type', DataTypes.U32)) msg_list.append(('self.tx_params_dry_bicarb_cart_size', DataTypes.U32)) msg_list.append(('self.tx_params_sodium', DataTypes.U32)) msg_list.append(('self.tx_params_bicarb_concentrate', DataTypes.U32)) msg_list.append(('self.tx_params_dialyzer_type', DataTypes.U32)) msg_list.append(('self.tx_params_fluid_bolus_volume', DataTypes.U32)) msg_list.append(('self.tx_params_bp_interval', DataTypes.U32)) msg_list.append(('self.tx_params_rinseback_volume', DataTypes.U32)) msg_list.append(('self.tx_params_hepatitis_b_status', DataTypes.U32)) msg_list.append(('self.tx_params_acid_concentrate', DataTypes.U32)) msg_list.append(('self.tx_params_subst_fluid_volume', DataTypes.F32)) msg_list.append(('self.tx_params_heparin_bolus_volume', DataTypes.F32)) msg_list.append(('self.tx_params_heparin_delivery_rate', DataTypes.F32)) msg_list.append(('self.tx_params_dialysate_temperature', DataTypes.F32)) msg_list.append(('self.tx_params_uf_pre_weight', DataTypes.F32)) msg_list.append(('self.tx_params_uf_est_target_weight', DataTypes.F32)) msg_list.append(('self.tx_params_uf_volume', DataTypes.F32)) self.process_into_vars(decoder_list = msg_list, message = message) self.tx_params_validate_timestamp = timestamp def _handler_uf_val_resp_sync(self, message: dict, timestamp=0.0) -> None: """ Handles ultrafiltration volume validation response from the TD. Ultrafiltration volume acceptance and rejection are captured. :param message: the published TD ultrafiltration volume validation response message. :param timestamp: timestamp in epoch time of the received message :return: none """ msg_list = [] msg_list.append(('self.uf_volume_acceptance', DataTypes.U32)) msg_list.append(('self.uf_volume_rr', DataTypes.U32)) msg_list.append(('self.uf_volume_ml', DataTypes.F32)) self.process_into_vars(decoder_list = msg_list, message = message) self.uf_volume_timestamp = timestamp def _handler_initiate_treatment_resp_sync(self, message: dict, timestamp=0.0) -> None: """ Handles inititate treatment response from the TD. Treatment acceptance and rejection are captured. :param message: the published TD inititate treatment response message. :param timestamp: timestamp in epoch time of the received message :return: none """ msg_list = [] msg_list.append(('self.initiate_tx_acceptance', DataTypes.U32)) msg_list.append(('self.initiate_tx_rr', DataTypes.U32)) self.process_into_vars(decoder_list = msg_list, message = message) self.initiate_tx_timestamp = timestamp def _handler_uf_pause_resp_sync(self, message: dict, timestamp=0.0) -> None: """ Handles ultrafiltration pause response from the TD. Ultrafiltration pause acceptance and rejection are captured. :param message: the published TD inititate treatment response message. :param timestamp: timestamp in epoch time of the received message :return: none """ msg_list = [] msg_list.append(('self.uf_pause_resume_acceptance', DataTypes.U32)) msg_list.append(('self.uf_pause_resume_rr', DataTypes.U32)) self.process_into_vars(decoder_list = msg_list, message = message) self.uf_pause_resume_timestamp = timestamp def cmd_send_ui_treatment_param_data(self, final_confirmation: int, treatment_modality: int, hdf_dilution: int, blood_flow_rate: int, dialysate_flow_rate: int, treatment_duration: int, heparin_duration: int, heparin_type: int, dry_bicarb_cart_size: int, sodium: int, bicarb_concentrate: int, dialyzer_type: int, fluid_bolus_volume: int, bp_interval: int, rinseback_volume: int, hepatitis_b_status: int, acid_concentrate: int, subst_fluid_volume: float, heparin_bolus_volume: float, heparin_delivery_rate: float, dialysate_temp: float, uf_pre_weight: float, uf_est_target_weight: float, uf_volume: float) -> None: """ Constructs and sends a UI set treatment parameters message. :param final_confirmation: (int) 0 - if only validating parameters, 1 - if validating and moving the state forward :param treatment_modality: (int) type of treatment: 0-HD, 1-HDF :param hdf_dilution: (int) HDF dilution option (TDTreatmentHDFDilution) :param blood_flow_rate: (int) blood flow rate (in mL/min) :param dialysate_flow_rate: (int) dialysate flow rate (in mL/min) :param treatment_duration: (int) treatment duration (in min) :param heparin_duration: (int) heparin delivery duration (in min) :param heparin_type: (int) heparin Type (HeparinTypes) :param dry_bicarb_cart_size: (int) dry bicarb cart size option :param sodium: (int) sodium (in mEq/L) :param bicarb_concentrate: (int) bicarbonate concentrate type (BicarbTypes) :param dialyzer_type: (int) dialyzer type (DialyzerTypes) :param fluid_bolus_volume: (int) fluid bolus volume (in mL) :param bp_interval: (int) blood pressure measurement interval (in min) :param rinseback_volume: (int) rinse back volume (in mL) :param hepatitis_b_status: (int) Hepatitis B status option (TDTreatmentHepatitisB) :param acid_concentrate: (int) acid concentrate type (AcidTypes) :param subst_fluid_volume: (float) substitution fluid volume (in L) :param heparin_bolus_volume: (float) heparin bolus volume (in mL) :param heparin_delivery_rate: (float) heparin delivery rate (in mL/hr) :param dialysate_temp: (float) dialysate temperature (in deg C) :param uf_pre_weight: (float) patient pre weight prior to treatment (in Kilogram) :param uf_est_target_weight: (float) patient estimated target weight after the treatment (in Kilogram) :param uf_volume: (float) ultrafiltration volume (in L) :return: None """ payload = integer_to_bytearray(final_confirmation) payload += integer_to_bytearray(treatment_modality) payload += integer_to_bytearray(hdf_dilution) payload += integer_to_bytearray(blood_flow_rate) payload += integer_to_bytearray(dialysate_flow_rate) payload += integer_to_bytearray(treatment_duration) payload += integer_to_bytearray(heparin_duration) payload += integer_to_bytearray(heparin_type) payload += integer_to_bytearray(dry_bicarb_cart_size) payload += integer_to_bytearray(sodium) payload += integer_to_bytearray(bicarb_concentrate) payload += integer_to_bytearray(dialyzer_type) payload += integer_to_bytearray(fluid_bolus_volume) payload += integer_to_bytearray(bp_interval) payload += integer_to_bytearray(rinseback_volume) payload += integer_to_bytearray(hepatitis_b_status) payload += integer_to_bytearray(acid_concentrate) payload += float_to_bytearray(subst_fluid_volume) payload += float_to_bytearray(heparin_bolus_volume) payload += float_to_bytearray(heparin_delivery_rate) payload += float_to_bytearray(dialysate_temp) payload += float_to_bytearray(uf_pre_weight) payload += float_to_bytearray(uf_est_target_weight) payload += float_to_bytearray(uf_volume) cmd_generic_override(payload = payload, reset = None, channel_id = DenaliChannels.ui_to_td_ch_id, msg_id = MsgIds.MSG_ID_UI_TREATMENT_PARAMS_TO_VALIDATE, entity_name = 'Sneding UI Treatment Parameter to TD', override_text = 'N/A', logger = self.logger, can_interface = self.can_interface) def cmd_send_ui_version_request(self) -> None: """ Constructs and sends a UI version request broadcast. Can be used to retrieve all sub system version responses. @return: none """ major = unsigned_byte_to_bytearray(0) minor = unsigned_byte_to_bytearray(0) micro = unsigned_byte_to_bytearray(0) build = short_to_bytearray(0) compatibility = integer_to_bytearray(self.SW_COMPATIBILITY_REV) payload = major + minor + micro + build + compatibility cmd_generic_override(payload = payload, reset = None, channel_id = DenaliChannels.ui_sync_broadcast_ch_id, msg_id = MsgIds.MSG_ID_FW_VERSIONS_REQUEST, entity_name = 'UI Version Request', override_text = 'N/A', logger = self.logger, can_interface = self.can_interface) def cmd_send_ui_initiate_treatment(self, accepted:int = ACCEPTED ) -> None: """ Constructs and sends a UI initiate treatment message. :param accepted: (int) the acceptance value :return: none """ acc = integer_to_bytearray(accepted) payload = acc cmd_generic_override(payload = payload, reset = None, channel_id = DenaliChannels.ui_to_td_ch_id, msg_id = MsgIds.MSG_ID_UI_INITIATE_TREATMENT_WORKFLOW, entity_name = 'UI Initiate Treatment Workflow', override_text = 'N/A', logger = self.logger, can_interface = self.can_interface) def cmd_send_ui_confirmed_treatment_parameters (self, accepted: int = ACCEPTED ) -> None: """ Constructs and sends a UI confirmation of treatment parameters message. :param accepted: (int) the acceptance value :return: none """ acc = integer_to_bytearray(accepted) payload = acc cmd_generic_override(payload = payload, reset = None, channel_id = DenaliChannels.ui_to_td_ch_id, msg_id = MsgIds.MSG_ID_UI_TREATMENT_PARAMS_CONFIRMED, entity_name = 'UI Confirm Treatment Parameters', override_text = 'N/A', logger = self.logger, can_interface = self.can_interface) def cmd_send_ui_pause_resume_uf(self, accepted:int = ACCEPTED, rejection_reason:int = RequestRejectReasons.REQUEST_REJECT_REASON_NONE.value ) -> None: """ Constructs and sends a UI pause/resume ultrafiltration message. :param accepted: (int) the acceptance value :param rejection_reason: (int) the rejection reason value :return: none """ acc = integer_to_bytearray(accepted) rjr = integer_to_bytearray(rejection_reason) payload = acc + rjr cmd_generic_override(payload = payload, reset = None, channel_id = DenaliChannels.ui_to_td_ch_id, msg_id = MsgIds.MSG_ID_UI_UF_PAUSE_RESUME_REQUEST, entity_name = 'UI Pause or Resume UF Request', override_text = 'N/A', logger = self.logger, can_interface = self.can_interface) def cmd_send_ui_validate_uf_volume (self, uf_volume: float = 0.0 ) -> None: """ Constructs and sends a UI validate ultrafiltration message. :param uf_volume: (float) the uf volume between 0.0 - 8.0 :return: none """ ufv = float_to_bytearray(uf_volume) payload = ufv cmd_generic_override(payload = payload, reset = None, channel_id = DenaliChannels.ui_to_td_ch_id, msg_id = MsgIds.MSG_ID_UI_ULTRAFILTRATION_VOLUME_TO_VALIDATE, entity_name = 'Validate UI Ultrafiltration Volume', override_text = 'N/A', logger = self.logger, can_interface = self.can_interface) def cmd_send_ui_alarm_list_request (self) -> None: """ Constructs and sends a UI request for the active alarm list. :return: none """ cmd_generic_override(payload = None, reset = None, channel_id = DenaliChannels.ui_to_td_ch_id, msg_id = MsgIds.MSG_ID_UI_ACTIVE_ALARMS_LIST_REQUEST, entity_name = 'UI Active Alarm List Request', override_text = 'N/A', logger = self.logger, can_interface = self.can_interface) def cmd_send_ui_silence_alarm(self, silence: int = 1) -> None: """ Constructs and sends a UI request to silence the alarm sound. :param silence: (int) 1 to silence, 0 to de-silence :return: none """ sil = float_to_bytearray(silence) payload = sil cmd_generic_override(payload = payload, reset = None, channel_id = DenaliChannels.ui_to_td_ch_id, msg_id = MsgIds.MSG_ID_USER_ALARM_SILENCE_REQUEST, entity_name = 'UI Alarm Silence Request', override_text = 'N/A', logger = self.logger, can_interface = self.can_interface)