#!/bin/python3 # Adjusting the dialin folder for the new leahi_dialin. from leahi_dialin.ui.utils import singleton_threadsafe from leahi_dialin.utils import conversions from leahi_dialin.protocols import CAN from leahi_dialin.common import msg_ids @singleton_threadsafe class TD_Messaging(): def __init__(self): self.can_enabled: bool=False self.can_Channel: str = "can0" class fakeLogger(): def __init__(self): pass def error(a1, a2): pass def info(a1, a2): pass self.can_interface = CAN.DenaliCanMessenger(can_interface=self.can_Channel, logger=fakeLogger() ) self.can_interface.start() if self.can_interface is not None: self.can_enabled = True def td_operation_mode(self, op_mode: int, sub_mode: int = 0): """Broadcasts the current TD operation mode (Msg ID: 0x12, 18) Args: op_mode (int ): operation mode sub_mode (int, optional ): operation sub-mode. Defaults to 0. """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(op_mode ) payload += conversions.integer_to_bytearray(sub_mode) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_OP_MODE_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_temperatures(self, board_temp: float): """ Broadcasts the current TD Temperatures Data (Msg ID: 0x53, 83) Args: @param board_temp : TD Board temperature @return: None """ # TODO: replace with proper payload and message ID once message is defined if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.float_to_bytearray(board_temp) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_TEMPERATURE_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_blood_flow(self, H4_set_flow : int, H4_meas_flow : float, H4_rot_speed : float, H4_mot_speed : float, H4_curr_motor : float, H4_set_rpm : float, H4_rot_count : int, H4_pres_flow : int, H6_rot_hall_state : int): """ Broadcasts the current TD Blood Pump Data (Msg ID: 0x11, 17) Args: @param H4_set_flow : Set flow rate in mL/min. @param H4_meas_flow : Measured flow rate in mL/min. @param H4_rot_speed : Measured rotor speed in RPM. @param H4_mot_speed : Measured pump speed in RPM. @param H4_curr_motor : Measure motor current in Amps. @param H4_set_rpm : Set motor speed in RPM. @param H4_rot_count : Rotor count. @param H4_pres_flow : Prescribed blood flow in mL/min. @param H6_rot_hall_state : Rotor hall state (1=home, 0=not home) @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray (H4_set_flow ) payload += conversions.float_to_bytearray (H4_meas_flow ) payload += conversions.float_to_bytearray (H4_rot_speed ) payload += conversions.float_to_bytearray (H4_mot_speed ) payload += conversions.float_to_bytearray (H4_curr_motor ) payload += conversions.float_to_bytearray (H4_set_rpm ) payload += conversions.unsigned_integer_to_bytearray(H4_rot_count ) payload += conversions.unsigned_integer_to_bytearray(H4_pres_flow ) payload += conversions.unsigned_integer_to_bytearray(H6_rot_hall_state ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_BLOOD_PUMP_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_air_trap(self, H17_state : int, H16_state : int, H17_raw_state : int, H16_raw_state : int, H13_state : int, H20_State : int, controlling : int): """ Broadcasts the current TD air trap data(Msg ID: 0x2A, 42) Args: @param H17_state : lower level value @param H16_state : upper level value @param H17_raw_state : lower level raw value @param H16_raw_state : upper level raw value @param H13_state : Air trap intake Valve state (open/closed) @param H20_State : Air trap outlet valve state (open/closed) @param controlling : air control @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(H17_state ) payload += conversions.unsigned_integer_to_bytearray(H16_state ) payload += conversions.unsigned_integer_to_bytearray(H17_raw_state ) payload += conversions.unsigned_integer_to_bytearray(H16_raw_state ) payload += conversions.unsigned_integer_to_bytearray(H13_state ) payload += conversions.unsigned_integer_to_bytearray(H20_State ) payload += conversions.unsigned_integer_to_bytearray(controlling ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_AIR_TRAP_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_air_bubbles(self, H18_status : int, H18_state : int): """ Broadcasts the current TD Air bubbles data (Msg ID: 0x1E, 30) Args: H18_status (int) :Air Bubble Status H18_state (int) :Air Bubble State @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(H18_status ) payload += conversions.unsigned_integer_to_bytearray(H18_state ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_BUBBLES_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_air_pump(self, H12_state : int, H12_power : int): """ Broadcasts the current TD Air Pump data (Msg ID: 0x20, 32) Args: @param H12_state (int ) : air pump state @param H12_power (int ) : air pump power @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(H12_state) payload += conversions.unsigned_integer_to_bytearray(H12_power) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_AIR_PUMP_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_ejector(self, H5_state: int, H5_set_speed: float): """ Broadcasts the current TD Ejector data (Msg ID: 0x4E, 78) Args: @param H5_state (int ) : ejector state @param H5_set_speed (float): ejector set speed @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(H5_state ) payload += conversions.float_to_bytearray (H5_set_speed) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_EJECTOR_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_switches(self, H9_door: int): """ Broadcasts the current TD switch Data (Msg ID: 0x21, 33) Args: @param H9_door (int): door status @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(H9_door ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_SWITCHES_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_battery(self, capacity: int, ac_power: int): """ Broadcasts the current TD Battery Data (Msg ID: 0x54, 84) Args: @param capacity (int): battery capacity @param ac_power (int): a/c power status @return: None """ # TODO: replace with proper payload and message ID once message is defined if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(capacity) payload += conversions.unsigned_integer_to_bytearray(ac_power) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_BATTERY_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_valves(self, valve_id: int, state: int, pos_name: int, pos_count: int, next_pos: int): """ Broadcasts the current TD Valves Data (Msg ID: 0x2B, 43) Args: @param valve_id (int): Valve ID @param state (int): Valve State @param pos_name (int): Position Name @param pos_count (int): Position Count @param next_pos (int): Next Position @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(valve_id ) payload += conversions.unsigned_integer_to_bytearray(state ) payload += conversions.unsigned_integer_to_bytearray(pos_name ) payload += conversions.short_to_bytearray (pos_count ) payload += conversions.short_to_bytearray (next_pos ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_VALVES_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_voltage(self, line_1_2v: float, line_3_3v: float, line_logic_5v: float, line_sensors_5v: float, line_24v: float, line_regen_24v: float, fpga_adc_ref: float, res_ref: float, fpga_vcc: float, fpga_vaux: float, fpga_vpvn: float): """ Broadcasts the current TD voltage data (Msg ID: 0x1D, 29) Args: @param line_1_2v (float): Processor 1.2V @param line_3_3v (float): Logic voltage (3.3V) @param line_logic_5v (float): Logic voltage (5V) @param line_sensors_5v (float): Sensors voltage (5V) @param line_24v (float): Actuators voltage (24V) @param line_regen_24v (float): Actuators regen voltage (24V) @param fpga_adc_ref (float): FPGA ADC reference voltage (1V) @param res_ref (float): PBA ADC reference voltage (3V) @param fpga_vcc (float): FPGA input voltage (3V) @param fpga_vaux (float): FPGA aux. voltage (3V) @param fpga_vpvn (float): FPGA pvn voltage (1V) @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.float_to_bytearray(line_1_2v ) payload += conversions.float_to_bytearray(line_3_3v ) payload += conversions.float_to_bytearray(line_logic_5v ) payload += conversions.float_to_bytearray(line_sensors_5v ) payload += conversions.float_to_bytearray(line_24v ) payload += conversions.float_to_bytearray(line_regen_24v ) payload += conversions.float_to_bytearray(fpga_adc_ref ) payload += conversions.float_to_bytearray(res_ref ) payload += conversions.float_to_bytearray(fpga_vcc ) payload += conversions.float_to_bytearray(fpga_vaux ) payload += conversions.float_to_bytearray(fpga_vpvn ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_VOLTAGES_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_pressure(self, H2_arterial_pressure : float , H14_venous_pressure : float , limit_state : int , H2_arterial_min : int , H2_arterial_max : int , H14_venous_min : int , H14_venous_max : int , H2_arterial_long : float , H14_venous_long : float , tmp_pressure : float , tmp_min : float , tmp_max : float ): """ Broadcasts the current TD Pressures data (Msg ID: 0x24, 36) Args: @param H2_arterial_pressure (float) : Current Arterial pressure value @param H14_venous_pressure (float) : Current Venous pressure value @param limit_state (int ) : Pressure limit state @param H2_arterial_min (int ) : Arterial pressure minimum @param H2_arterial_max (int ) : Arterial pressure maximum @param H14_venous_min (int ) : Venous pressure minimum @param H14_venous_max (int ) : Venous pressure maximum @param H2_arterial_long (float) : Arterial Long @param H14_venous_long (float) : Venous Long @param tmp_pressure (float) : TMP pressure @param tmp_min (float) : TMP minimum @param tmp_max (float) : TMP maximum @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.float_to_bytearray (H2_arterial_pressure ) payload += conversions.float_to_bytearray (H14_venous_pressure ) payload += conversions.unsigned_integer_to_bytearray(limit_state ) payload += conversions.integer_to_bytearray (H2_arterial_min ) payload += conversions.integer_to_bytearray (H2_arterial_max ) payload += conversions.integer_to_bytearray (H14_venous_min ) payload += conversions.integer_to_bytearray (H14_venous_max ) payload += conversions.float_to_bytearray (H2_arterial_long ) payload += conversions.float_to_bytearray (H14_venous_long ) payload += conversions.float_to_bytearray (tmp_pressure ) payload += conversions.float_to_bytearray (tmp_min ) payload += conversions.float_to_bytearray (tmp_max ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_PRESSURE_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_saline( self, target_volume : int , cumulative_volume : float , bolus_volume : float , state : int ): """ Broadcasts the current TD Saline data (Msg ID: 0x3D, 61) Args: @param target_volume (int ) : Saline target volume @param cumulative_volume (float) : Saline cumulative volume @param bolus_volume (float) : Saline bolus set volume @param state (int ) : Saline bolus state @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(target_volume ) payload += conversions.float_to_bytearray (cumulative_volume ) payload += conversions.float_to_bytearray (bolus_volume ) payload += conversions.unsigned_integer_to_bytearray(state ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_SALINE_BOLUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_vitals( self, systolic : int , diastolic : int , heartRate : int ): """ Broadcasts the current TD Vitals data (Msg ID: 0xXX, XX) Args: @param systolic (int ) : systolic vital @param diastolic (int ) : diastolic vital @param heartRate (int ) : heart rate @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray (systolic ) payload += conversions.unsigned_integer_to_bytearray (diastolic ) payload += conversions.unsigned_integer_to_bytearray (heartRate ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_VITALS_DATA.value, payload=payload) print(message) self.can_interface.send(message, 0) def td_ultrafiltration( self, set_volume : float , target_rate : float , volume_delivered : float , state : int ): """ Broadcasts the current TD Ultrafiltration data (Msg ID: 0x3E, 62) Args: @param set_volume (float) : UF set volume (L) @param target_rate (float) : UF target Rate (L) @param volume_delivered (float) : UF Volume delivered (L) @param state (int ) : UF state @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.float_to_bytearray (set_volume ) payload += conversions.float_to_bytearray (target_rate ) payload += conversions.float_to_bytearray (volume_delivered ) payload += conversions.unsigned_integer_to_bytearray(state ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_ULTRAFILTRATION_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_treatment_time( self, tx_duration_s : int , tx_elapsed_s : int , tx_remaining_s : int ): """ Broadcasts the current TD Treatment Time data (Msg ID: 0x3B, 58) Args: @param tx_duration_s (int) : Treatment set duration (s) @param tx_elapsed_s (int) : Treatment elaspsed time (s) @param tx_remaining_s (int) : Treatment time remaining (s) @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(tx_duration_s ) payload += conversions.unsigned_integer_to_bytearray(tx_elapsed_s ) payload += conversions.unsigned_integer_to_bytearray(tx_remaining_s ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_TREATMENT_TIME_DATA.value, payload=payload) self.can_interface.send(message, 0) def td_treatment_set_points(self, blood_flow : int , dialysate_flow : int , dialysate_temp : float ): """ Broadcasts the current TD Treatment Set Points data (Msg ID: 0x4F, 79) Args: @param blood_flow (int ) : Target Blood Flow @param dialysate_flow (int ) : Target Dialysate Flow @param dialysate_temp (float) : Target Dialysate Temperature @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(blood_flow ) payload += conversions.unsigned_integer_to_bytearray(dialysate_flow ) payload += conversions.float_to_bytearray (dialysate_temp ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_TREATMENT_SET_POINTS.value, payload=payload) self.can_interface.send(message, 0) def td_versions(self, major: int, minor: int, micro: int, build: int, fpga_id: int, fpga_major: int, fpga_minor: int, fpga_lab: int, compatibility_rev: int): """ Broadcasts the current TD Version Data (Msg ID: 0x0E, 14) Args: @param major: (uint) - Major version number @param minor: (uint) - Minor version number @param micro: (uint) - Micro version number @param build: (uint) - Build version number @param fpga_id: (int) - FPGA id version number @param fpga_major: (int) - FPGA Major version number @param fpga_minor: (int) - FPGA Minor version number @param fpga_lab: (int) - FPGA Lab version number @param compatibility_rev: (uint) - The FWs/UI compatibility revision @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_byte_to_bytearray (major ) payload += conversions.unsigned_byte_to_bytearray (minor ) payload += conversions.unsigned_byte_to_bytearray (micro ) payload += conversions.unsigned_short_to_bytearray (build ) payload += conversions.unsigned_byte_to_bytearray (fpga_id ) payload += conversions.unsigned_byte_to_bytearray (fpga_major ) payload += conversions.unsigned_byte_to_bytearray (fpga_minor ) payload += conversions.unsigned_byte_to_bytearray (fpga_lab ) payload += conversions.unsigned_integer_to_bytearray(compatibility_rev) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_VERSION_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def td_serial(self, serial: str): """ the td version serial response message method (Msg ID: 0x51, 81) Args: @param serial: serial number @return: None """ payload = bytes(serial, 'ascii') + b'\x00' message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_SERIAL_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def td_request_ui_versions(self): """ the td request UI versions(Msg ID: 0x15, 21) Args: None @return: None """ message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_UI_VERSION_INFO_REQUEST.value) self.can_interface.send(message, 0) def td_vitals_adjustment_response(self,vRejectionReason: int): """ the vitals adjustment response message method(Msg ID: 0xXX, XX) Args: None @return: None """ payload = conversions.integer_to_bytearray(1 if vRejectionReason == 0 else 0) payload += conversions.integer_to_bytearray(vRejectionReason) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_VITALS_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def td_Treatment_Parameters_CreateRx(self, vRejectionReason: int): """ TD response to in initiate Treatment and enter Create Rx (Msg ID: 0x46, 70) Args: None @return: None """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(1 if vRejectionReason == 0 else 0) payload += conversions.integer_to_bytearray(vRejectionReason) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_RESP_INITIATE_TREATMENT_WORKFLOW.value, payload=payload) self.can_interface.send(message, 0) def td_Treatment_Parameters_Validation( self, vAccepted : int = 1, vBloodFlowRateRejectReason : int = 0, vDialysateFlowRateRejectReason : int = 0, vTreatmentDurationRejectReason : int = 0, vSalineBolusVolumeRejectReason : int = 0, vHeparinStopTimeRejectReason : int = 0, vHeparinTypeRejectReason : int = 0, vAcidConcentrateRejectReason : int = 0, vBicarbonateConcentrateRejectReason : int = 0, vDialyzerTypeRejectReason : int = 0, vBloodPressureMeasureIntervalRejectReason : int = 0, vRinsebackFlowRateRejectReason : int = 0, vRinsebackVolumeRejectReason : int = 0, vArterialPressureLimitWindowRejectReason : int = 0, vVenousPressureLimitWindowRejectReason : int = 0, vVenousPressureLimitAsymtrcRejectReason : int = 0, vTrancembrncPressureLimitWindowRejectReason : int = 0, vDialysateTempRejectReason : int = 0, vHeparinDispensingRateRejectReason : int = 0, vHeparinBolusVolumeRejectReason : int = 0 ): """TD Treatment Parameter Validation Response to the SW validation request with sent values of the same parameters.(Msg ID: 0x40, 64) Args: vAccepted (int, optional): Zero value if rejected. Defaults to 1. vBloodFlowRateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vDialysateFlowRateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vTreatmentDurationRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vSalineBolusVolumeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vHeparinStopTimeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vHeparinTypeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vAcidConcentrateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vBicarbonateConcentrateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vDialyzerTypeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vBloodPressureMeasureIntervalRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vRinsebackFlowRateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vRinsebackVolumeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vArterialPressureLimitWindowRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vVenousPressureLimitWindowRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vVenousPressureLimitAsymtrcRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vTrancembrncPressureLimitWindowRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vDialysateTempRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vHeparinDispensingRateRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. vHeparinBolusVolumeRejectReason (int, optional): None zero value of rejection reason. Defaults to 0. """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.integer_to_bytearray(vAccepted ) payload += conversions.integer_to_bytearray(vBloodFlowRateRejectReason ) payload += conversions.integer_to_bytearray(vDialysateFlowRateRejectReason ) payload += conversions.integer_to_bytearray(vTreatmentDurationRejectReason ) payload += conversions.integer_to_bytearray(vSalineBolusVolumeRejectReason ) payload += conversions.integer_to_bytearray(vHeparinStopTimeRejectReason ) payload += conversions.integer_to_bytearray(vHeparinTypeRejectReason ) payload += conversions.integer_to_bytearray(vAcidConcentrateRejectReason ) payload += conversions.integer_to_bytearray(vBicarbonateConcentrateRejectReason ) payload += conversions.integer_to_bytearray(vDialyzerTypeRejectReason ) payload += conversions.integer_to_bytearray(vBloodPressureMeasureIntervalRejectReason ) payload += conversions.integer_to_bytearray(vRinsebackFlowRateRejectReason ) payload += conversions.integer_to_bytearray(vRinsebackVolumeRejectReason ) payload += conversions.integer_to_bytearray(vArterialPressureLimitWindowRejectReason ) payload += conversions.integer_to_bytearray(vVenousPressureLimitWindowRejectReason ) payload += conversions.integer_to_bytearray(vVenousPressureLimitAsymtrcRejectReason ) payload += conversions.integer_to_bytearray(vTrancembrncPressureLimitWindowRejectReason ) payload += conversions.integer_to_bytearray(vDialysateTempRejectReason ) payload += conversions.integer_to_bytearray(vHeparinDispensingRateRejectReason ) payload += conversions.integer_to_bytearray(vHeparinBolusVolumeRejectReason ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_to_ui_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_RESP_TREATMENT_PARAMS_TO_VALIDATE.value, payload=payload) self.can_interface.send(message, 0) def td_tx_state(self, sub_mode : int, blood_prime_state : int, dialysis_state : int, isolated_uf_state : int, tx_stop_state : int, tx_rinseback_state : int, tx_recirculate_state : int, tx_end_state : int, tx_saline_state : int, tx_heparin_state : int) -> None: """A general method to send any standard response message, by it's id and list of parameters. (Msg ID: 0x3C, 60) Args: sub_mode (int): Treatment Submode blood_prime_state (int): Treatment Blood Prime State dialysis_state (int): Treatment dialysis state isolated_uf_state (int): Treatment Isolated UF state tx_stop_state (int): Treatment Stop state tx_rinseback_state (int): Treatment Rinseback State tx_recirculate_state (int): Treatment Recirculate State tx_end_state (int): Treatment End state tx_saline_state (int): Saline Bolus state tx_heparin_state (int): Heparin state """ if not self.can_enabled: raise ValueError("CAN Interface is not enabled") payload = conversions.unsigned_integer_to_bytearray(sub_mode ) payload += conversions.unsigned_integer_to_bytearray(blood_prime_state ) payload += conversions.unsigned_integer_to_bytearray(dialysis_state ) payload += conversions.unsigned_integer_to_bytearray(isolated_uf_state ) payload += conversions.unsigned_integer_to_bytearray(tx_stop_state ) payload += conversions.unsigned_integer_to_bytearray(tx_rinseback_state ) payload += conversions.unsigned_integer_to_bytearray(tx_recirculate_state ) payload += conversions.unsigned_integer_to_bytearray(tx_end_state ) payload += conversions.unsigned_integer_to_bytearray(tx_saline_state ) payload += conversions.unsigned_integer_to_bytearray(tx_heparin_state ) message = CAN.DenaliMessage.build_message( channel_id=CAN.DenaliChannels.td_sync_broadcast_ch_id, message_id=msg_ids.MsgIds.MSG_ID_TD_TREATMENT_STATE_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_send_general_response(self, message_id: int, accepted: int, reason: int, is_pure_data: bool = False, has_parameters: bool = False, parameters_payload: any = 0x00, channel_id=CAN.DenaliChannels.td_to_ui_ch_id) -> None: """A general method to send any standard response message, by it's id and list of parameters. Args: message_id (int ): the id of the message accepted (int ): the standard accepted parameter of any response message reason (int ): the standard rejection reason parameter of any response message is_pure_data (bool, optional ): The message only has data. Defaults to False. has_parameters (bool, optional ): if the message has parameter this needs to be true. Defaults to False. parameters_payload (any, optional ): the list of parameters pre-converted and ready to be concatenated to the payload. Defaults to 0x00. channel_id (_type_, optional ): indicates the channel. Defaults to CAN.DenaliChannels.td_to_ui_ch_id. """ payload = "" if not is_pure_data: payload = conversions.integer_to_bytearray(accepted) payload += conversions.integer_to_bytearray(reason ) if has_parameters: payload = parameters_payload message = CAN.DenaliMessage.build_message( channel_id, message_id, payload ) self.can_interface.send(message, 0)