########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file HemodialysisDevice.py # # @date 16-Oct-2019 # @author S. Nash # # @brief This class provides the basic interface to communicate with # the HD board. # ############################################################################ from DialityCoreCanProtocol import DenaliCanMessenger from DialityCoreCanProtocol import DenaliMessage from time import sleep from binascii import unhexlify import struct class HD: # for reset param in override commands NO_RESET = 0 RESET = 1 def __init__(self, can__interface="can0"): """ HD constructor using can bus \param bus: can bus, e.g. "can0" \returns HD object provides test/service commands for the HD sub-system. \details For example: hd_object = HD(can__interface='can0') or hd_object = HD('can0') """ # Create listener self.can_interface = DenaliCanMessenger(can_interface=can__interface) self.can_interface.start() # Create command groups self._Basics = HD.HD__Basics(self) self.Alarms = HD.HD_Alarms(self) self.Buttons = HD.HD_Buttons(self) self.BloodFlow = HD.HD_BloodFlow(self, self.can_interface) self.Watchdog = HD.HD_Watchdog(self) class HD__Basics: """ \class HD__Basics \brief Hemodialysis Device (HD) Dialin API sub-class for basic HD commands. """ # Basic message IDs MSG_ID_LOGIN_TO_HD = 0x8000 MSG_ID_HD_MSG = 0x8001 # HD login password HD_LOGIN_PASSWORD = '123' def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD__Basics constructor \param outer_instance: reference to the HD (outer) class. \returns HD__Basics object. """ def CmdLogInToHD(self): """ Constructs and sends a login command via CAN bus. Login required before \n other commands can be sent to the HD. \returns 1 if logged in, 0 if log in failed """ message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOGIN_TO_HD, payload=list(map(int, map(ord, self.HD_LOGIN_PASSWORD)))) print("login") # Send message received_message = self.outer_instance.can_interface.send(message) if received_message is not None: print(received_message) print("Logged In: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdHDMessageInsert(self, msg): """ Constructs and sends the insert HD message command. Given message will \n be inserted into the HD's received message queue for processing. \param msg: byte array - properly formatted HD message to insert \returns 1 if successful, zero otherwise """ message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_MSG, payload=msg) print("insert HD message") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) print("Inserted message: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_Buttons: """ \class HD_Buttons \brief Hemodialysis Device (HD) Dialin API sub-class for button related commands. """ # Buttons message IDs MSG_ID_HD_OFF_BUTTON_OVERRIDE = 0x8002 MSG_ID_HD_STOP_BUTTON_OVERRIDE = 0x8003 def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Buttons constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Buttons object. """ def CmdOffButtonOverride(self, reset, state): """ Constructs and sends the Off button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override off button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) payload = rst + sta message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_OFF_BUTTON_OVERRIDE, payload=payload) print("override off button") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Off button overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdStopButtonOverride(self, reset, state): """ Constructs and sends the Stop button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override stop button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) payload = rst + sta message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_STOP_BUTTON_OVERRIDE, payload=payload) print("override stop button") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Stop button overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message["message"][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_Watchdog: """ \class HD_Watchdog \brief Hemodialysis Device (HD) Dialin API sub-class for watchdog related commands. """ # Watchdog message IDs MSG_ID_HD_WD_CHECKIN_OVERRIDE = 0x8005 def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Watchdog constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Watchdog object. """ def CmdWatchdogTaskCheckinOverride(self, reset, state, task): """ Constructs and sends the watchdog task check-in override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 for task checked in, 0 for task not checked in \param task: integer - ID of task to override \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) tsk = self.outer_instance.integer2ByteArray(task) payload = rst + sta + tsk message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_WD_CHECKIN_OVERRIDE, payload=payload) print("override watchdog task check-in state") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("checked in" if state != 0 else "not checked in") print("Watchdog task check-in overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_Alarms: """ \class HD_Alarms \brief Hemodialysis Device (HD) Dialin API sub-class for alarm related commands. """ # Alarms message IDs MSG_ID_HD_ALARMS_PUBLISHED_STATUS = 0x0002 MSG_ID_HD_ALARM_LAMP_OVERRIDE = 0x8004 MSG_ID_HD_ALARM_STATE_OVERRIDE = 0x8006 MSG_ID_HD_ALARM_TIME_OVERRIDE = 0x8007 # Alarm lamp patterns HD_ALARM_LAMP_PATTERN_OFF = 0 HD_ALARM_LAMP_PATTERN_OK = 1 HD_ALARM_LAMP_PATTERN_FAULT = 2 HD_ALARM_LAMP_PATTERN_HIGH = 3 HD_ALARM_LAMP_PATTERN_MEDIUM = 4 HD_ALARM_LAMP_PATTERN_LOW = 5 HD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_TOP END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARMS_SILENCED = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARMS_SILENCED = START_POS_ALARMS_SILENCED + 1 START_POS_ALARMS_FLAGS = END_POS_ALARMS_SILENCED END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 1 def __init__(self, outer_instance, can_interface=None): """ HD_Alarms constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Alarms object. """ self.outer_instance = outer_instance if can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARMS_PUBLISHED_STATUS can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerAlarmsStatusSyncFunction) self.alarmsState = 0 self.alarmTop = 0 self.alarmsSilenceExpiresIn = 0 self.alarmsSilenced = False self.alarmsFlags = byte(0) def handlerAlarmsStatusSyncFunction(self, message): """ Handles published alarms status messages. Alarms status data are captured for reference. \param message: published blood flow data message \returns none """ self.alarmsState = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmTop = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmsSilenceExpiresIn = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmsSilenced = (False if bytearray( message['message'][self.START_POS_ALARMS_SILENCED:self.END_POS_ALARMS_SILENCED]) != 1 else True) self.alarmsFlags = bytearray( message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]) def CmdAlarmStateOverride(self, reset, state, alarm): """ Constructs and sends the alarm state override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 for alarm active, 0 for alarm inactive \param alarm: integer - ID of alarm to override \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) alm = self.outer_instance.integer2ByteArray(alarm) payload = rst + sta + alm message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_ALARM_STATE_OVERRIDE, payload=payload) print("override alarm state") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") print("Alarm state overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdAlarmTimeOverride(self, reset, time_ms, alarm): """ Constructs and sends the alarm time override command \param reset: integer - 1 to reset a previous override, 0 to override \param time_ms: integer - time (in ms) since alarm was activated \param alarm: integer - ID of alarm to override \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) ms = self.outer_instance.integer2ByteArray(time_ms) alm = self.outer_instance.integer2ByteArray(alarm) payload = rst + ms + alm message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_ALARM_TIME_OVERRIDE, payload=payload) print("override alarm time since activated") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(time_ms) print("Alarm time since activated overridden to " + str_res + " ms: " + str( received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdAlarmLampPatternOverride(self, reset, pattern): """ Constructs and sends the alarm lamp pattern override command. \param reset: integer - 1 to reset a previous override, 0 to override \param pattern: integer - ID of alarm lamp pattern to override with \returns 1 if successful, zero otherwise \details Patterns: \n 0 = off \n 1 = ok \n 2 = fault \n 3 = high \n 4 = medium \n 5 = low \n 6 = manual """ rst = self.outer_instance.integer2ByteArray(reset) pat = self.outer_instance.integer2ByteArray(pattern) payload = rst + pat message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.CmdAlarmLampPatternOverride(), payload=payload) print("override alarm lamp pattern") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_pat = "reset back to normal" elif pattern == self.HD_ALARM_LAMP_PATTERN_OFF: str_pat = "off" elif pattern == self.HD_ALARM_LAMP_PATTERN_OK: str_pat = "ok" elif pattern == self.HD_ALARM_LAMP_PATTERN_FAULT: str_pat = "fault" elif pattern == self.HD_ALARM_LAMP_PATTERN_HIGH: str_pat = "high" elif pattern == self.HD_ALARM_LAMP_PATTERN_MEDIUM: str_pat = "medium" elif pattern == self.HD_ALARM_LAMP_PATTERN_LOW: str_pat = "low" else: str_pat = "manual" print("Alarm lamp pattern overridden to " + str_pat + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_BloodFlow: """ \class HD_BloodFlow \brief Hemodialysis Device (HD) Dialin API sub-class for blood-flow related commands. """ # BloodFlow message IDs MSG_ID_HD_BLOOD_FLOW_PUBLISHED_DATA = 0x0005 MSG_ID_HD_BLOOD_FLOW_SET_RATE_OVERRIDE = 0x8008 MSG_ID_HD_BLOOD_FLOW_MEAS_RATE_OVERRIDE = 0x8009 MSG_ID_HD_BLOOD_PUMP_MC_MEAS_SPEED_OVERRIDE = 0x800A MSG_ID_HD_BLOOD_PUMP_MC_MEAS_CURRENT_OVERRIDE = 0x800B MSG_ID_HD_BLOOD_FLOW_PUBLISH_INTERVAL_OVERRIDE = 0x800C MSG_ID_HD_BLOOD_PUMP_MEAS_SPEED_OVERRIDE = 0x800E MSG_ID_HD_BLOOD_PUMP_ROTOR_MEAS_SPEED_OVERRIDE = 0x800F # BloodFlow broadcast message field positions START_POS_SET_PT = DenaliMessage.PAYLOAD_START_INDEX END_POS_SET_PT = START_POS_SET_PT + 4 START_POS_MEAS_FLOW = END_POS_SET_PT END_POS_MEAS_FLOW = START_POS_MEAS_FLOW + 4 START_POS_MEAS_ROT_SPEED = END_POS_MEAS_FLOW END_POS_MEAS_ROT_SPEED = START_POS_MEAS_ROT_SPEED + 4 START_POS_MEAS_SPEED = END_POS_MEAS_ROT_SPEED END_POS_MEAS_SPEED = START_POS_MEAS_SPEED + 4 START_POS_MEAS_MC_SPEED = END_POS_MEAS_SPEED END_POS_MEAS_MC_SPEED = START_POS_MEAS_MC_SPEED + 4 START_POS_MEAS_MC_CURR = END_POS_MEAS_MC_SPEED END_POS_MEAS_MC_CURR = START_POS_MEAS_MC_CURR + 4 def __init__(self, outer_instance, can_interface=None): """ HD_BloodFlow constructor \param outer_instance: reference to the HD (outer) class. \returns HD_BloodFlow object. """ self.outer_instance = outer_instance if can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = self.MSG_ID_HD_BLOOD_FLOW_PUBLISHED_DATA can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerBloodFlowSyncFunction) self.TargetBloodFlowRate = 0 self.MeasuredBloodFlowRate = 0.0 self.MeasuredBloodPumpRotorSpeed = 0.0 self.MeasuredBloodPumpSpeed = 0.0 self.MeasuredBloodPumpMCSpeed = 0.0 self.MeasuredBloodPumpMCurrent = 0.0 def handlerBloodFlowSyncFunction(self, message): """ Handles published blood flow data messages. Blood flow data are captured for reference. \param message: published blood flow data message \returns none """ self.TargetBloodFlowRate = int.from_bytes(bytearray( message['message'][self.START_POS_SET_PT:self.END_POS_SET_PT]), byteorder=DenaliMessage.BYTE_ORDER) self.MeasuredBloodFlowRate = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_FLOW:self.END_POS_MEAS_FLOW])) self.MeasuredBloodPumpRotorSpeed = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_ROT_SPEED:self.END_POS_MEAS_ROT_SPEED])) self.MeasuredBloodPumpSpeed = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_SPEED:self.END_POS_MEAS_SPEED])) self.MeasuredBloodPumpMCSpeed = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_MC_SPEED:self.END_POS_MEAS_MC_SPEED])) self.MeasuredBloodPumpMCCurrent = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_MC_CURR:self.END_POS_MEAS_MC_CURR])) def CmdBloodFlowSetPointOverride(self, reset, flow): """ Constructs and sends the blood flow set point override command \param reset: integer - 1 to reset a previous override, 0 to override \param flow: integer - flow set point (in mL/min) to override with \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) flo = self.outer_instance.integer2ByteArray(flow) payload = rst + flo message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_BLOOD_FLOW_SET_RATE_OVERRIDE, payload=payload) print("override blood flow set point") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(flow) print( "Blood flow set point overridden to " + str_res + " mL/min: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdBloodFlowMeasuredOverride(self, reset, flow): """ Constructs and sends the measured blood flow override command \param reset: integer - 1 to reset a previous override, 0 to override \param flow: integer - measured flow (in mL/min) to override with \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) flo = self.outer_instance.integer2ByteArray(flow) payload = rst + flo message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_BLOOD_FLOW_MEAS_RATE_OVERRIDE, payload=payload) print("override measured blood flow") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(flow) print("Blood flow (measured)) overridden to " + str_res + " mL/min: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False # TODO - add override command for blood pump motor speed (motor hall sensor via FPGA) # TODO - add override command for blood pump speed (pump hall sensor via NHET?) def CmdBloodPumpMCMeasuredSpeedOverride(self, reset, speed): """ Constructs and sends the measured blood pump motor controller speed \n override command. \param reset: integer - 1 to reset a previous override, 0 to override \param speed: integer - speed (in RPM) to override with \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) spd = self.outer_instance.integer2ByteArray(speed) payload = rst + spd message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_BLOOD_PUMP_MC_MEAS_SPEED_OVERRIDE, payload=payload) print("override measured blood pump motor controller speed") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(speed) print("Blood pump MC speed (measured) overridden to " + str_res + " RPM: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdBloodPumpMeasuredCurrentOverride(self, reset, curr): """ Constructs and sends the measured blood pump motor current override command \param reset: integer - 1 to reset a previous override, 0 to override \param curr: integer - current (in mA) to override with \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.integer2ByteArray(curr) payload = rst + cur message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_BLOOD_PUMP_MC_MEAS_CURRENT_OVERRIDE, payload=payload) print("override measured blood pump motor controller current") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(curr) print("Blood pump MC current (measured) overridden to " + str_res + " mA: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdBloodPumpMeasuredSpeedOverride(self, reset, speed): """ Constructs and sends the measured blood pump motor speed override \n command. \param reset: integer - 1 to reset a previous override, 0 to override \param speed: integer - speed (in RPM) to override with \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) spd = self.outer_instance.integer2ByteArray(speed) payload = rst + spd message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_BLOOD_PUMP_MEAS_SPEED_OVERRIDE, payload=payload) print("override measured blood pump speed") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(speed) print("Blood pump speed (measured) overridden to " + str_res + " RPM: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdBloodPumpRotorMeasuredSpeedOverride(self, reset, speed): """ Constructs and sends the measured blood pump rotor speed override \n command. \param reset: integer - 1 to reset a previous override, 0 to override \param speed: integer - speed (in RPM) to override with \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) spd = self.outer_instance.integer2ByteArray(speed) payload = rst + spd message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_BLOOD_PUMP_ROTOR_MEAS_SPEED_OVERRIDE, payload=payload) print("override measured blood pump rotor speed") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = str(speed) print("Blood pump rotor speed (measured) overridden to " + str_res + " RPM: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdBloodFlowBroadcastIntervalOverride(self, reset, ms): """ Constructs and sends the measured flow broadcast interval override command \param reset: integer - 1 to reset a previous override, 0 to override \param ms: integer - interval (in ms) to override with \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) mis = self.outer_instance.integer2ByteArray(ms) payload = rst + mis message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_BLOOD_FLOW_PUBLISH_INTERVAL_OVERRIDE, payload=payload) print("override blood flow broadcast interval") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) if reset == HD.RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " print("Blood flow broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def integer2ByteArray(self, val): """ Converts an integer value into a byte array (little endian) \param val: integer to convert to byte array \returns byte array """ fmt = '%08x' # integer to hex string formatter b = unhexlify(fmt % val) # convert int to byte array b = b[::-1] # little endian byte order return b # def registerAsyncReceiver(self, message_id, method): # t1 = method # t2 = message_id # def registerSyncReceiver(self, message_id, method): # t1 = method # t2 = message_id if __name__ == "__main__": # create an HD object called hd hd = HD() # wait 2 seconds and then login to HD as a tester sleep(2) hd._Basics.CmdLogInToHD() tgtRate = 0 hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) while True: if hd.BloodFlow.TargetBloodFlowRate == 0: if tgtRate != 0: hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) tgtRate = 0 else: if tgtRate == 0: hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 200) tgtRate = hd.BloodFlow.TargetBloodFlowRate # hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.RESET,0)