########################################################################### # # Copyright (c) 2021-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file syringe_pump.py # # @author (last) Micahel Garthwaite # @date (last) 17-Aug-2023 # @author (original) Sean Nash # @date (original) 12-Mar-2021 # ############################################################################ import struct from logging import Logger from .constants import RESET, NO_RESET from ..common.hd_defs import HeparinStates, SyringePumpStates, SyringePumpOperations from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray, float_to_bytearray class HDSyringePump(AbstractSubSystem): """ HDSyringePump Hemodialysis Delivery (HD) Dialin API sub-class for syringe pump related commands. """ def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_SYRINGE_PUMP_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_syringe_pump_data) self.syringe_pump_state = SyringePumpStates.SYRINGE_PUMP_INIT_STATE.value self.heparin_state = HeparinStates.HEPARIN_STATE_OFF.value self.syringe_pump_set_rate_ml_hr = 0.0 self.syringe_pump_meas_rate_ml_hr = 0.0 self.syringe_pump_position = 0 self.syringe_pump_volume_ml = 0.0 self.syringe_pump_safety_volume_ml = 0.0 self.syringe_pump_home_v = 0.0 self.syringe_pump_switch_v = 0.0 self.syringe_pump_force_v = 0.0 self.syringe_pump_status = 0 self.syringe_pump_encoder_status = 0 self.syringe_pump_adc_dac_status = 0 self.syringe_pump_adc_read_counter = 0 self.hd_syringe_pump_timestamp = 0.0 def get_syringe_pump_state(self): """ Gets the current syringe pump state. @return: latest published syringe pump state. SYRINGE_PUMP_INIT_STATE = 0 SYRINGE_PUMP_OFF_STATE = 1 SYRINGE_PUMP_RETRACT_STATE = 2 SYRINGE_PUMP_SEEK_STATE = 3 SYRINGE_PUMP_PRIME_STATE = 4 SYRINGE_PUMP_HEP_BOLUS_STATE = 5 SYRINGE_PUMP_HEP_CONTINUOUS_STATE = 6 SYRINGE_PUMP_CONFIG_FORCE_SENSOR_STATE = 7 """ return self.syringe_pump_state def get_heparin_state(self): """ Gets the current Heparin state. @return: latest published Heparin state. HEPARIN_STATE_OFF = 0 HEPARIN_STATE_STOPPED = 1 HEPARIN_STATE_PAUSED = 2 HEPARIN_STATE_INITIAL_BOLUS = 3 HEPARIN_STATE_DISPENSING = 4 HEPARIN_STATE_COMPLETED = 5 HEPARIN_STATE_EMPTY = 6 """ return self.heparin_state def get_syringe_pump_set_rate(self): """ Gets the current set syringe pump rate. @return: latest published syringe pump set rate (in mL/hr). """ return self.syringe_pump_set_rate_ml_hr def get_syringe_pump_meas_rate(self): """ Gets the current measured syringe pump rate. @return: latest published syringe pump measured rate (in mL/hr). """ return self.syringe_pump_meas_rate_ml_hr def get_syringe_pump_position(self): """ Gets the current syringe pump position. @return: latest published syringe pump position (in encoder counts). """ return self.syringe_pump_position def get_syringe_pump_volume_delivered_ml(self): """ Gets the current syringe pump volume delivered. @return: latest published syringe pump volume delivered (in mL). """ return self.syringe_pump_volume_ml def get_syringe_pump_home_v(self): """ Gets the current syringe pump home voltage reading @return: latest published voltage read from the home optical sensor """ return self.syringe_pump_home_v def get_syringe_pump_switch_v(self): """ Gets the current syringe pump switch voltage reading @return: latest published voltage read from the syringe detection switch """ return self.syringe_pump_switch_v def get_syringe_pump_force_v(self): """ Gets the current syringe pump force voltage reading @return: latest published voltage read from the force sensor """ return self.syringe_pump_force_v def get_syringe_pump_safety_volume(self): """ Gets the current syringe pump safety volume reading @return: latest published safety volume calculated by HD firmware """ return self.syringe_pump_safety_volume_ml def get_syringe_pump_status(self): """ Gets the current syringe pump status @return: latest published syringe pump status by HD firmware """ return self.syringe_pump_status def get_syringe_pump_encoder_status(self): """ Gets the current syringe pump encoder status @return: latest published syringe pump encoder status by HD firmware """ return self.syringe_pump_encoder_status def get_syringe_pump_adc_dac_status(self): """ Gets the current syringe pump ADC & DAC status @return: latest published syringe pump ADC & DAC status by HD firmware """ return self.syringe_pump_adc_dac_status def get_syringe_pump_adc_read_counter(self): """ Gets the current syringe pump ADC read counter @return: latest published ADC read counter by HD firmware """ return self.syringe_pump_adc_read_counter @publish(["hd_syringe_pump_timestamp", "syringe_pump_state", "syringe_pump_set_rate_ml_hr", "syringe_pump_meas_rate_ml_hr", "syringe_pump_position", "syringe_pump_volume_ml", "syringe_pump_home_v", "syringe_pump_switch_v", "syringe_pump_force_v", "heparin_state", "syringe_pump_safety_volume_ml", "syringe_pump_status", "syringe_pump_encoder_status", "syringe_pump_adc_dac_status", "syringe_pump_adc_read_counter"]) def _handler_syringe_pump_data(self, message, timestamp=0.0): """ Handles published syringe pump data messages. Syringe pump data are captured for reference. @param message: published syringe pump data message @return: None """ sta = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) hep = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) srt = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) mrt = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) pos = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) vol = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) hom = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) det = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) frc = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9])) saf = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10])) sts = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_11:MsgFieldPositions.END_POS_FIELD_11])) self.syringe_pump_state = sta[0] self.heparin_state = hep[0] self.syringe_pump_set_rate_ml_hr = srt[0] self.syringe_pump_meas_rate_ml_hr = mrt[0] self.syringe_pump_position = pos[0] self.syringe_pump_volume_ml = vol[0] self.syringe_pump_home_v = hom[0] self.syringe_pump_switch_v = det[0] self.syringe_pump_force_v = frc[0] self.syringe_pump_safety_volume_ml = saf[0] self.syringe_pump_status = (sts[0] & 0xFF000000) >> 24 self.syringe_pump_encoder_status = (sts[0] & 0x00FF0000) >> 16 self.syringe_pump_adc_dac_status = (sts[0] & 0x0000FF00) >> 8 self.syringe_pump_adc_read_counter = (sts[0] & 0x000000FF) self.hd_syringe_pump_timestamp = timestamp def cmd_syringe_pump_operation(self, operation: int = SyringePumpOperations.SYRINGE_PUMP_OP_STOP.value, rate: float = 0.0, volume: float = 0.0) -> int: """ Constructs and sends the syringe pump operation command Constraints: Must be logged into HD. Syringe pump must be in appropriate state for the given operation. Given rate/volume (when applicable) must be within valid range for the given operation. The following Treatment parameters should be set before executing operations 3,4 & 5: - Treatment Duration - Heparin Pre-Stop Time - Rate if volume is being sent - Volume if rate is being sent Failure to set these treatment parameters may result in faulting the HD device. @param operation: unsigned int - ID of operation being requested @param rate: float - target rate for given operation (if applicable) @param volume: float - target volume for given operation (if applicable) @return: 1 if successful, zero otherwise Syringe pump operation IDs: SYRINGE_PUMP_OP_STOP = 0 SYRINGE_PUMP_OP_RETRACT = 1 SYRINGE_PUMP_OP_SEEK = 2 SYRINGE_PUMP_OP_PRIME = 3 SYRINGE_PUMP_OP_BOLUS = 4 SYRINGE_PUMP_OP_CONTINUOUS = 5 """ op = integer_to_bytearray(operation) rat = float_to_bytearray(rate) vol = float_to_bytearray(volume) payload = op + rat + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_OPERATION_REQUEST.value, payload=payload) self.logger.debug("requesting syringe pump operation " + str(operation) + ", rate=" + str(rate) + ", volume=" + str(volume)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_data_broadcast_interval_override(self, ms: int = 1000, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump data broadcast interval override command Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_SEND_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump data broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Syringe pump data broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_meas_position_override(self, position: int = 0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump measured position override command Constraints: Must be logged into HD. @param position: integer - position (in encoder counts) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) pos = integer_to_bytearray(position) payload = rst + pos message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_MEASURED_POSITION_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump measured position") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(position) + " encoder counts: " self.logger.debug("Syringe pump measured position overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_meas_rate_override(self, rate: float = 0.0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump measured rate override command Constraints: Must be logged into HD. @param rate: float - rate (in mL/hr) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) rat = float_to_bytearray(rate) payload = rst + rat message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_MEASURED_RATE_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump measured rate") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(rate) + " mL/hr: " self.logger.debug("Syringe pump measured rate overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_meas_force_override(self, volts: float = 0.0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump measured force override command Constraints: Must be logged into HD. @param volts: float - volts to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vol = float_to_bytearray(volts) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_MEASURED_FORCE_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump measured force") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(volts) + " volts: " self.logger.debug("Syringe pump measured force overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_meas_detect_override(self, volts: float = 0.0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump measured syringe detect override command Constraints: Must be logged into HD. @param volts: float - volts to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vol = float_to_bytearray(volts) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_SYRINGE_DETECT_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump measured syringe detection") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(volts) + " volts: " self.logger.debug("Syringe pump measured syringe detection overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_meas_home_override(self, volts: float = 0.0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump measured home override command Constraints: Must be logged into HD. @param volts: float - volts to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vol = float_to_bytearray(volts) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_MEASURED_HOME_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump measured home") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(volts) + " volts: " self.logger.debug("Syringe pump measured home overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_meas_volume_override(self, volume: float = 0.0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump measured volume override command Constraints: Must be logged into HD. @param volume: float - volume (in mL) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vol = float_to_bytearray(volume) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_MEASURED_VOLUME_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump measured volume delivered") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(volume) + " mL: " self.logger.debug("Syringe pump measured volume overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_status_override(self, status: int = 0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump status override command Constraints: Must be logged into HD. @param status: integer - status (0..255) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sts = integer_to_bytearray(status & 0x000000FF) payload = rst + sts message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_STATUS_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump status") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(status) self.logger.debug("Syringe pump status overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_encoder_status_override(self, status: int = 0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump encoder status override command Constraints: Must be logged into HD. @param status: integer - encoder status (0..255) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sts = integer_to_bytearray(status & 0x000000FF) payload = rst + sts message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_ENCODER_STATUS_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump encoder status") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(status) self.logger.debug("Syringe pump encoder status overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_adc_dac_status_override(self, status: int = 0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump ADC & DAC status override command Constraints: Must be logged into HD. @param status: integer - ADC and DAC status (0..255) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sts = integer_to_bytearray(status & 0x000000FF) payload = rst + sts message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_ADC_DAC_STATUS_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump ADC & DAC status") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(status) self.logger.debug("Syringe pump ADC & DAC status overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_syringe_pump_adc_read_counter_override(self, counter: int = 0, reset: int = NO_RESET) -> int: """ Constructs and sends the syringe pump ADC read counter override command Constraints: Must be logged into HD. @param counter: integer - status (0..255) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) ctr = integer_to_bytearray(counter & 0x000000FF) payload = rst + ctr message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_ADC_READ_COUNTER_OVERRIDE.value, payload=payload) self.logger.debug("override HD syringe pump ADC read counter") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(counter) self.logger.debug("Syringe pump ADC read counter overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_set_syringe_pump_dac_ref_voltage(self) -> int: """ Constructs and sends the set syringe pump DAC vRef. The value the DAC is set to is within HD Calibration Record. Constraints: Must be logged into HD. @return: 1 if successful, zero otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_FORCE_SENSOR_DAC_CALIBRATE.value, ) self.logger.debug("Set HD syringe pump DAC reference voltage") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Syringe pump DAC reference voltage set.") # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_heprin_target_rate_override(self, rate: float, reset: int = NO_RESET) -> int: """ Constructs and sends the heprin bolus target rate value override command Constraints: Must be logged into HD. @param rate: (float) the heparin bolus target rate to be set in mL/hour @param reset: (int) 1 to reset a previous override, 0 to override @return 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlu = float_to_bytearray(rate) # HD expects the rate in mL/hour payload = reset_value + vlu message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SYRINGE_PUMP_HEPARIN_BOLUS_TARGET_RATE_OVERRIDE.value, payload=payload) self.logger.debug("Overriding heprin bolus target rate value override") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Heprin bolus target value override Timeout!!!") return False