########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file HemodialysisDevice.py # # @date 16-Oct-2019 # @author S. Nash # # @brief This class provides the basic interface to communicate with # the HD board. # ############################################################################ from DialityCoreCanProtocol import DenaliCanMessenger from DialityCoreCanProtocol import DenaliMessage from DialityCoreCanProtocol import DenaliChannels from time import sleep from binascii import unhexlify #from HD_DialOutFlow import HD_DialOut #from HD_DialOutFlow import DialOutStates import struct import ctypes class HD: # for reset param in override commands NO_RESET = 0 RESET = 1 def __init__(self, can__interface="can0"): """ HD constructor using can bus \param bus: can bus, e.g. "can0" \returns HD object provides test/service commands for the HD sub-system. \details For example: hd_object = HD(can__interface='can0') or hd_object = HD('can0') """ # Create listener self.can_interface = DenaliCanMessenger(can_interface=can__interface) self.can_interface.start() # Create command groups self._Basics = HD.HD__Basics(self) self.Alarms = HD.HD_Alarms(self, self.can_interface) self.BloodFlow = HD.HD_BloodFlow(self, self.can_interface) self.Buttons = HD.HD_Buttons(self) self.DialysateInletFlow = HD.HD_DialysateInletFlow(self, self.can_interface) self.DialysateOutletFlow = HD.HD_DialysateOutletFlow(self, self.can_interface) self.Pressure_Occlusion = HD.HD_Pressure_Occlusion(self, self.can_interface) self.RTC = HD.HD_RTC(self, self.can_interface) self.Treatment = HD.HD_Treatment(self, self.can_interface) self.UI = HD.HD_UI(self) self.Watchdog = HD.HD_Watchdog(self) ## DialOut is an HD_DialOutFlow object # self.DialOut = HD_DialOut(self.can_interface) class HD__Basics: """ \class HD__Basics \brief Hemodialysis Device (HD) Dialin API sub-class for basic HD commands. """ # Basic message IDs MSG_ID_LOGIN_TO_HD = 0x8000 MSG_ID_HD_MSG = 0x8001 # HD login password HD_LOGIN_PASSWORD = '123' def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD__Basics constructor \param outer_instance: reference to the HD (outer) class. \returns HD__Basics object. """ def CmdLogInToHD(self): """ Constructs and sends a login command via CAN bus. Login required before \n other commands can be sent to the HD. \returns 1 if logged in, 0 if log in failed """ message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOGIN_TO_HD, payload=list(map(int, map(ord, self.HD_LOGIN_PASSWORD)))) print("login") # Send message received_message = self.outer_instance.can_interface.send(message) if received_message is not None: #print(received_message) if received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] == 1: print("Logged In") else: print("Log In Failed.") #print("Logged In: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdHDMessageInsert(self, msg): """ Constructs and sends the insert HD message command. Given message will \n be inserted into the HD's received message queue for processing. \param msg: byte array - properly formatted HD message to insert \returns 1 if successful, zero otherwise """ message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_MSG, payload=msg) print("insert HD message") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: #print(received_message) print("Inserted message: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_Buttons: """ \class HD_Buttons \brief Hemodialysis Device (HD) Dialin API sub-class for button related commands. """ # Buttons message IDs MSG_ID_HD_OFF_BUTTON_OVERRIDE = 0x8002 MSG_ID_HD_STOP_BUTTON_OVERRIDE = 0x8003 def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Buttons constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Buttons object. """ def CmdOffButtonOverride(self, reset, state): """ Constructs and sends the Off button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override off button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) payload = rst + sta message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_OFF_BUTTON_OVERRIDE, payload=payload) print("override off button") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: #print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Off button overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdStopButtonOverride(self, reset, state): """ Constructs and sends the Stop button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override stop button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) payload = rst + sta message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_STOP_BUTTON_OVERRIDE, payload=payload) print("override stop button") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: #print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Stop button overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message["message"][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_UI: """ \class HD_UI \brief Hemodialysis Device (HD) Dialin API sub-class for UI commands. """ # UI message IDs MSG_ID_UI_CHECKIN_WITH_HD = 0x0007 def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_UI constructor \param outer_instance: reference to the HD (outer) class. \returns HD_UI object. """ def CmdUICheckinWithHD(self): """ Constructs and sends the UI checkin message \returns none """ payload = None message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_UI_CHECKIN_WITH_HD) print("Sending UI checkin w/ HD") # Send message received_message = self.outer_instance.can_interface.send(message, 0) return 0 class HD_Watchdog: """ \class HD_Watchdog \brief Hemodialysis Device (HD) Dialin API sub-class for watchdog related commands. """ # Watchdog message IDs MSG_ID_HD_WD_CHECKIN_OVERRIDE = 0x8005 def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Watchdog constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Watchdog object. """ def CmdWatchdogTaskCheckinOverride(self, reset, state, task): """ Constructs and sends the watchdog task check-in override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 for task checked in, 0 for task not checked in \param task: integer - ID of task to override \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) tsk = self.outer_instance.integer2ByteArray(task) payload = rst + sta + tsk message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_WD_CHECKIN_OVERRIDE, payload=payload) print("override watchdog task check-in state") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: #print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("checked in" if state != 0 else "not checked in") print("Watchdog task check-in overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_Alarms: """ \class HD_Alarms \brief Hemodialysis Device (HD) Dialin API sub-class for alarm related commands. """ # Alarms message IDs MSG_ID_HD_ALARMS_PUBLISHED_STATUS = 0x0002 MSG_ID_HD_ALARM_ACTIVATE = 0x0003 MSG_ID_HD_ALARM_CLEAR = 0x0004 MSG_ID_HD_ALARM_LAMP_OVERRIDE = 0x8004 MSG_ID_HD_ALARM_STATE_OVERRIDE = 0x8006 MSG_ID_HD_ALARM_TIME_OVERRIDE = 0x8007 # Alarm lamp patterns HD_ALARM_LAMP_PATTERN_OFF = 0 HD_ALARM_LAMP_PATTERN_OK = 1 HD_ALARM_LAMP_PATTERN_FAULT = 2 HD_ALARM_LAMP_PATTERN_HIGH = 3 HD_ALARM_LAMP_PATTERN_MEDIUM = 4 HD_ALARM_LAMP_PATTERN_LOW = 5 HD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_TOP END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 START_POS_ALARMS_FLAGS = END_POS_ALARM_ESCALATES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, outer_instance, can_interface=None): """ HD_Alarms constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Alarms object. """ self.outer_instance = outer_instance if can_interface is not None: channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARMS_PUBLISHED_STATUS can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerAlarmsStatusSyncFunction) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARM_ACTIVATE can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerAlarmActivateFunction) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARM_CLEAR can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerAlarmClearFunction) # composite alarm status based on latest HD alarm status broadcast message self.alarmsState = 0 self.alarmTop = 0 self.alarmsSilenceExpiresIn = 0 self.alarmsEscalatesIn = 0 self.alarmsFlags = 0 # alarm states based on received HD alarm activation and alarm clear messages self.alarmStates = [False] * 50 # for x in range(0,511): # self.alarmStates.append(False) def handlerAlarmsStatusSyncFunction(self, message): """ Handles published alarms status messages. Alarms status data are captured for reference. \param message: published blood flow data message \returns none """ self.alarmsState = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmTop = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmsSilenceExpiresIn = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmsEscalatesIn = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_ESCALATES_IN:self.END_POS_ALARM_ESCALATES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmsFlags = int.from_bytes(bytearray( message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]), byteorder=DenaliMessage.BYTE_ORDER) def handlerAlarmActivateFunction(self, message): """ Handles published HD alarm activation messages. \param message: published HD alarm activation message \returns none """ alarmID = struct.unpack(' After STOP:{} <---".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.setUFState(DialOutStates.RUN) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After RUN again: {}".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.setUFState(DialOutStates.STOP) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After STOP: {}".format(hd.DialOut.DialOutBroadcast)) sleep(5) hd.DialOut.plotBroadCastSignals() exit() tgtRate = 0 hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) while True: if hd.BloodFlow.TargetBloodFlowRate == 0: if tgtRate != 0: hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) tgtRate = 0 else: if tgtRate == 0: hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 200) tgtRate = hd.BloodFlow.TargetBloodFlowRate sleep(1) print(hd.BloodFlow.MeasuredBloodFlowRate) # hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.RESET,0) """