########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file HemodialysisDevice.py # # @date 16-Oct-2019 # @author S. Nash # # @brief This class provides the basic interface to communicate with # the HD board. # ############################################################################ from DialityCoreCanProtocol import DenaliCanMessenger from DialityCoreCanProtocol import DenaliMessage from DialityCoreCanProtocol import DenaliChannels from time import sleep from binascii import unhexlify #from HD_DialOutFlow import HD_DialOut #from HD_DialOutFlow import DialOutStates import struct import ctypes class HD: # for reset param in override commands NO_RESET = 0 RESET = 1 def __init__(self, can__interface="can0"): """ HD constructor using can bus \param bus: can bus, e.g. "can0" \returns HD object provides test/service commands for the HD sub-system. \details For example: hd_object = HD(can__interface='can0') or hd_object = HD('can0') """ # Create listener self.can_interface = DenaliCanMessenger(can_interface=can__interface) self.can_interface.start() # Create command groups self._Basics = HD.HD__Basics(self) self.Alarms = HD.HD_Alarms(self, self.can_interface) self.BloodFlow = HD.HD_BloodFlow(self, self.can_interface) self.Buttons = HD.HD_Buttons(self) self.DialysateInletFlow = HD.HD_DialysateInletFlow(self, self.can_interface) self.DialysateOutletFlow = HD.HD_DialysateOutletFlow(self, self.can_interface) self.Pressure_Occlusion = HD.HD_Pressure_Occlusion(self, self.can_interface) self.RTC = HD.HD_RTC(self, self.can_interface) self.Treatment = HD.HD_Treatment(self, self.can_interface) self.UI = HD.HD_UI(self) self.Watchdog = HD.HD_Watchdog(self) ## DialOut is an HD_DialOutFlow object # self.DialOut = HD_DialOut(self.can_interface) class HD__Basics: """ \class HD__Basics \brief Hemodialysis Device (HD) Dialin API sub-class for basic HD commands. """ # Basic message IDs MSG_ID_LOGIN_TO_HD = 0x8000 MSG_ID_HD_MSG = 0x8001 # HD login password HD_LOGIN_PASSWORD = '123' def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD__Basics constructor \param outer_instance: reference to the HD (outer) class. \returns HD__Basics object. """ def CmdLogInToHD(self): """ Constructs and sends a login command via CAN bus. Login required before \n other commands can be sent to the HD. \returns 1 if logged in, 0 if log in failed """ message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_LOGIN_TO_HD, payload=list(map(int, map(ord, self.HD_LOGIN_PASSWORD)))) print("login") # Send message received_message = self.outer_instance.can_interface.send(message) if received_message is not None: #print(received_message) if received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] == 1: print("Logged In") else: print("Log In Failed.") #print("Logged In: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdHDMessageInsert(self, msg): """ Constructs and sends the insert HD message command. Given message will \n be inserted into the HD's received message queue for processing. \param msg: byte array - properly formatted HD message to insert \returns 1 if successful, zero otherwise """ message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_MSG, payload=msg) print("insert HD message") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: #print(received_message) print("Inserted message: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_Buttons: """ \class HD_Buttons \brief Hemodialysis Device (HD) Dialin API sub-class for button related commands. """ # Buttons message IDs MSG_ID_HD_OFF_BUTTON_OVERRIDE = 0x8002 MSG_ID_HD_STOP_BUTTON_OVERRIDE = 0x8003 def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Buttons constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Buttons object. """ def CmdOffButtonOverride(self, reset, state): """ Constructs and sends the Off button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override off button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) payload = rst + sta message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_OFF_BUTTON_OVERRIDE, payload=payload) print("override off button") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: #print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Off button overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdStopButtonOverride(self, reset, state): """ Constructs and sends the Stop button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override stop button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) payload = rst + sta message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_STOP_BUTTON_OVERRIDE, payload=payload) print("override stop button") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: #print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Stop button overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message["message"][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_UI: """ \class HD_UI \brief Hemodialysis Device (HD) Dialin API sub-class for UI commands. """ # UI message IDs MSG_ID_UI_CHECKIN_WITH_HD = 0x0007 MSG_ID_HD_UF_PAUSE_RESUME_REQUEST = 0x0010 MSG_ID_UF_SETTINGS_CHANGE_REQUEST_BY_USER = 0x0011 MSG_ID_UF_SETTINGS_CHANGE_RESPONSE_FROM_HD = 0x0013 MSG_ID_UF_SETTINGS_CHANGE_CONFIRMED_BY_USER = 0x0015 MSG_ID_TREATMENT_DURATION_SETTING_CHANGE_REQUEST = 0x0016 LITER_TO_ML_CONVERSION_FACTOR = 1000.0 UF_CMD_PAUSE = 0 UF_CMD_RESUME = 1 UF_CMD_CHANGE_TIME_TO_ADJUST = 0 UF_CMD_CHANGE_RATE_TO_ADJUST = 1 RESPONSE_REJECTED = 0 RESPONSE_ACCEPTED = 1 # HD response to UF change request message field positions START_POS_UF_CHG_RSP_RESP = DenaliMessage.PAYLOAD_START_INDEX END_POS_UF_CHG_RSP_RESP = START_POS_UF_CHG_RSP_RESP + 4 START_POS_UF_CHG_RSP_VOL = END_POS_UF_CHG_RSP_RESP END_POS_UF_CHG_RSP_VOL = START_POS_UF_CHG_RSP_VOL + 4 START_POS_UF_CHG_RSP_TIME = END_POS_UF_CHG_RSP_VOL END_POS_UF_CHG_RSP_TIME = START_POS_UF_CHG_RSP_TIME + 4 START_POS_UF_CHG_RSP_RATE = END_POS_UF_CHG_RSP_TIME END_POS_UF_CHG_RSP_RATE = START_POS_UF_CHG_RSP_RATE + 4 START_POS_UF_CHG_RSP_TIME_DIFF = END_POS_UF_CHG_RSP_RATE END_POS_UF_CHG_RSP_TIME_DIFF = START_POS_UF_CHG_RSP_TIME_DIFF + 4 START_POS_UF_CHG_RSP_RATE_DIFF = END_POS_UF_CHG_RSP_TIME_DIFF END_POS_UF_CHG_RSP_RATE_DIFF = START_POS_UF_CHG_RSP_RATE_DIFF + 4 def __init__(self, outer_instance, can_interface=None): self.outer_instance = outer_instance """ HD_UI constructor \param outer_instance: reference to the HD (outer) class. \can_interface: interface to can device. \returns HD_UI object. """ # register function to handle HD response to UF change requests if can_interface is not None: channel_id = DenaliChannels.hd_to_ui_ch_id msg_id = self.MSG_ID_UF_SETTINGS_CHANGE_RESPONSE_FROM_HD can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerUFChangeResponseFunction) # initialize variables that will be populated by response from HD to UF change request self.UFChangeResponse = False self.UFChangeVolumeMl = 0 self.UFChangeTimeMin = 0 self.UFChangeRateMlMin = 0.0 self.UFChangeTimeDiff = 0 self.UFChangeRateDiff = 0.0 def handlerUFChangeResponseFunction(self, message): """ Handler for response from HD regarding UF change request. \param message: response message from HD regarding requested ultrafiltration settings change \returns none """ rsp = struct.unpack('i',bytearray( message['message'][self.START_POS_UF_CHG_RSP_RESP:self.END_POS_UF_CHG_RSP_RESP])) vol = struct.unpack('f',bytearray( message['message'][self.START_POS_UF_CHG_RSP_VOL:self.END_POS_UF_CHG_RSP_VOL])) tim = struct.unpack('i',bytearray( message['message'][self.START_POS_UF_CHG_RSP_TIME:self.END_POS_UF_CHG_RSP_TIME])) rat = struct.unpack('f', bytearray( message['message'][self.START_POS_UF_CHG_RSP_RATE:self.END_POS_UF_CHG_RSP_RATE])) tmd = struct.unpack('i',bytearray( message['message'][self.START_POS_UF_CHG_RSP_TIME_DIFF:self.END_POS_UF_CHG_RSP_TIME_DIFF])) rtd = struct.unpack('f', bytearray( message['message'][self.START_POS_UF_CHG_RSP_RATE_DIFF:self.END_POS_UF_CHG_RSP_RATE_DIFF])) if rsp[0] == self.RESPONSE_REJECTED: resp = False else: resp = True self.UFChangeResponse = resp self.UFChangeVolumeL = vol[0] / self.LITER_TO_ML_CONVERSION_FACTOR self.UFChangeTimeMin = tim[0] self.UFChangeRateMlMin = rat[0] self.UFChangeTimeDiff = tmd[0] self.UFChangeRateDifff = rtd[0] def CmdUICheckinWithHD(self): """ Constructs and sends the UI check-in message \returns none """ payload = None message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_UI_CHECKIN_WITH_HD) print("Sending UI checkin w/ HD") # Send message received_message = self.outer_instance.can_interface.send(message, 0) return 0 def CmdUIUFPauseResume(self, cmd=UF_CMD_PAUSE): """ Constructs and sends a UI UF command message \param cmd: 0 for pause, 1 for resume \returns none """ payload = self.outer_instance.integer2ByteArray(cmd) message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_UF_PAUSE_RESUME_REQUEST, payload=payload) if cmd == self.UF_CMD_PAUSE: str_cmd = "pause" else: str_cmd = "resume" print("Sending UF " + str_cmd + " command.") # Send message received_message = self.outer_instance.can_interface.send(message, 0) return 0 def CmdUIUFSettingsChangeRequest(self, vol=0.0, adj=UF_CMD_CHANGE_TIME_TO_ADJUST): """ Constructs and sends a UI UF change settings command message \param vol (float): new ultrafiltration volume setting (in L) \param adj (int): 0 for adjust time, 1 for adjust rate \returns none """ # reset response to this command so we can tell when response is received self.UFChangeResponse = None # build command message volume = self.outer_instance.float2ByteArray(vol * self.LITER_TO_ML_CONVERSION_FACTOR) adjust = self.outer_instance.integer2ByteArray(adj) payload = volume + adjust message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_UF_SETTINGS_CHANGE_REQUEST_BY_USER, payload=payload) print("Sending UF settings change request.") # Send message received_message = self.outer_instance.can_interface.send(message, 0) return 0 def CmdUIUFChangeSettingsConfirmedByUser(self, response=RESPONSE_REJECTED, vol=0.0, tm=0, rate=0.0): """ Constructs and sends a UI UF change settings confirmed by user message \param response (int): 0 for rejected, 1 for confirmed \param vol (float): volume (in L) that was confirmed \param tm (int): treatment time (in min) that was confirmed \param rate (float): ultrafiltration rate (in mL/min) that was confirmed \returns none """ resp = self.outer_instance.integer2ByteArray(response) volume = self.outer_instance.float2ByteArray(vol * self.LITER_TO_ML_CONVERSION_FACTOR) min = self.outer_instance.integer2ByteArray(tm) ufRate = self.outer_instance.float2ByteArray(rate) payload = resp + volume + min + ufRate message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_UF_SETTINGS_CHANGE_CONFIRMED_BY_USER, payload=payload) print("Sending UF settings change confirmation.") # Send message received_message = self.outer_instance.can_interface.send(message, 0) return 0 def CmdUITreatmentDurationSettingChangeRequest(self, timeMin=0): """ Constructs and sends a UI UF change settings confirmed by user message \param timeMin (int): treatment time (in min). \returns none """ payload = self.outer_instance.integer2ByteArray(timeMin) message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_TREATMENT_DURATION_SETTING_CHANGE_REQUEST, payload=payload) print("Sending treatment duration setting change request.") # Send message received_message = self.outer_instance.can_interface.send(message, 0) return 0 class HD_Watchdog: """ \class HD_Watchdog \brief Hemodialysis Device (HD) Dialin API sub-class for watchdog related commands. """ # Watchdog message IDs MSG_ID_HD_WD_CHECKIN_OVERRIDE = 0x8005 def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Watchdog constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Watchdog object. """ def CmdWatchdogTaskCheckinOverride(self, reset, state, task): """ Constructs and sends the watchdog task check-in override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 for task checked in, 0 for task not checked in \param task: integer - ID of task to override \returns 1 if successful, zero otherwise """ rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) tsk = self.outer_instance.integer2ByteArray(task) payload = rst + sta + tsk message = DenaliMessage.buildMessage(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_WD_CHECKIN_OVERRIDE, payload=payload) print("override watchdog task check-in state") # Send message received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: #print(received_message) if reset == HD.RESET: str_res = "reset back to normal" else: str_res = ("checked in" if state != 0 else "not checked in") print("Watchdog task check-in overridden to " + str_res + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False class HD_Alarms: """ \class HD_Alarms \brief Hemodialysis Device (HD) Dialin API sub-class for alarm related commands. """ # Alarms message IDs MSG_ID_HD_ALARMS_PUBLISHED_STATUS = 0x0002 MSG_ID_HD_ALARM_ACTIVATE = 0x0003 MSG_ID_HD_ALARM_CLEAR = 0x0004 MSG_ID_HD_ALARM_LAMP_OVERRIDE = 0x8004 MSG_ID_HD_ALARM_STATE_OVERRIDE = 0x8006 MSG_ID_HD_ALARM_TIME_OVERRIDE = 0x8007 # Alarm lamp patterns HD_ALARM_LAMP_PATTERN_OFF = 0 HD_ALARM_LAMP_PATTERN_OK = 1 HD_ALARM_LAMP_PATTERN_FAULT = 2 HD_ALARM_LAMP_PATTERN_HIGH = 3 HD_ALARM_LAMP_PATTERN_MEDIUM = 4 HD_ALARM_LAMP_PATTERN_LOW = 5 HD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_TOP END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 START_POS_ALARMS_FLAGS = END_POS_ALARM_ESCALATES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, outer_instance, can_interface=None): """ HD_Alarms constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Alarms object. """ self.outer_instance = outer_instance if can_interface is not None: channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARMS_PUBLISHED_STATUS can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerAlarmsStatusSyncFunction) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARM_ACTIVATE can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerAlarmActivateFunction) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARM_CLEAR can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerAlarmClearFunction) # composite alarm status based on latest HD alarm status broadcast message self.alarmsState = 0 self.alarmTop = 0 self.alarmsSilenceExpiresIn = 0 self.alarmsEscalatesIn = 0 self.alarmsFlags = 0 # alarm states based on received HD alarm activation and alarm clear messages self.alarmStates = [False] * 50 # for x in range(0,511): # self.alarmStates.append(False) def handlerAlarmsStatusSyncFunction(self, message): """ Handles published alarms status messages. Alarms status data are captured for reference. \param message: published blood flow data message \returns none """ self.alarmsState = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmTop = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmsSilenceExpiresIn = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmsEscalatesIn = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_ESCALATES_IN:self.END_POS_ALARM_ESCALATES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarmsFlags = int.from_bytes(bytearray( message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]), byteorder=DenaliMessage.BYTE_ORDER) def handlerAlarmActivateFunction(self, message): """ Handles published HD alarm activation messages. \param message: published HD alarm activation message \returns none """ alarmID = struct.unpack(' After STOP:{} <---".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.setUFState(DialOutStates.RUN) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After RUN again: {}".format(hd.DialOut.DialOutBroadcast)) sleep(3) hd.DialOut.setUFState(DialOutStates.STOP) sleep(2) state_stop = hd.DialOut.DialOutBroadcast['state'] print("After STOP: {}".format(hd.DialOut.DialOutBroadcast)) sleep(5) hd.DialOut.plotBroadCastSignals() exit() tgtRate = 0 hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) while True: if hd.BloodFlow.TargetBloodFlowRate == 0: if tgtRate != 0: hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) tgtRate = 0 else: if tgtRate == 0: hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 200) tgtRate = hd.BloodFlow.TargetBloodFlowRate sleep(1) print(hd.BloodFlow.MeasuredBloodFlowRate) # hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.RESET,0) """