########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file HemodialysisDevice.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending to and receiving from the HD device. # ############################################################################ from DialityCoreCanProtocol import DialityCanMessenger from DialityCoreCanProtocol import DialityPacket from time import sleep from binascii import unhexlify class HD: """ \class HD \brief Hemodialysis Device (HD) Dialin object API. It provides the basic interface to communicate with the HD board """ def __init__(self, can__interface="can0"): """ HD constructor using can bus \param bus: can bus, e.g. "can0" \returns HD object that allows communication with board via port \details For example: hd_object = HD(can__interface='can0') or hd_object = HD('can0') """ # Create listener self.can_interface = DialityCanMessenger(can_interface=can__interface) self.can_interface.start() self._Basics = HD.HD__Basics(self) self.Alarms = HD.HD_Alarms(self) self.Buttons = HD.HD_Buttons(self) self.BloodFlow = HD.HD_BloodFlow(self) self.Watchdog = HD.HD_Watchdog(self) class HD__Basics: """ \class HD__Basics \brief Hemodialysis Device (HD) Dialin API sub-class for basic HD commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance def login(self): """ send login command via serial port \returns 1 if logged in, 0 if log in failed """ channel_id = 0x400 request_id = 0x8000 cargo = b'123' message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("login") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) if received_message is not None: print(received_message) print("Logged In: " + str(received_message[4])) return received_message[4] else: print("Timeout!!!!") return False def HDMessageInsert(self, msg): """ sends the insert HD message command \param msg: byte array - properly formatted HD message to insert \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8001 cargo = msg message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("insert HD message") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) print("Inserted message: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False class HD_Buttons: """ \class HD_Buttons \brief Hemodialysis Device (HD) Dialin API sub-class for button related commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance def OffButtonOverride(self, reset, state): """ sends the Off button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override off button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8002 rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) cargo = rst + sta message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override off button") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Off button overridden to " + str_res + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def StopButtonOverride(self, reset, state): """ sends the Stop button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override stop button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8003 rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) cargo = rst + sta message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override stop button") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Stop button overridden to " + str_res + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False class HD_Watchdog: """ \class HD_Watchdog \brief Hemodialysis Device (HD) Dialin API sub-class for watchdog related commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance def WatchdogTaskCheckinOverride(self, reset, state, task): """ sends the watchdog task check-in override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 for task checked in, 0 for task not checked in \param task: integer - ID of task to override \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8005 rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) tsk = self.outer_instance.integer2ByteArray(task) cargo = rst + sta + tsk message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override watchdog task check-in state") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = ("checked in" if state != 0 else "not checked in") print("Watchdog task check-in overridden to " + str_res + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False class HD_Alarms: """ \class HD_Alarms \brief Hemodialysis Device (HD) Dialin API sub-class for alarm related commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance def AlarmStateOverride(self, reset, state, alarm): """ sends the alarm state override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 for alarm active, 0 for alarm inactive \param alarm: integer - ID of alarm to override \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8006 rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) alm = self.outer_instance.integer2ByteArray(alarm) cargo = rst + sta + alm message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override alarm state") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") print("Alarm state overridden to " + str_res + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def AlarmTimeOverride(self, reset, time_ms, alarm): """ sends the alarm time override command \param reset: integer - 1 to reset a previous override, 0 to override \param time_ms: integer - time (in ms) since alarm was activated \param alarm: integer - ID of alarm to override \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8007 rst = self.outer_instance.integer2ByteArray(reset) ms = self.outer_instance.integer2ByteArray(time_ms) alm = self.outer_instance.integer2ByteArray(alarm) cargo = rst + ms + alm message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override alarm time since activated") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(time_ms) print("Alarm time since activated overridden to " + str_res + " ms: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def AlarmLampPatternOverride(self, reset, pattern): """ sends the alarm lamp pattern override command. \param reset: integer - 1 to reset a previous override, 0 to override \param pattern: integer - ID of alarm lamp pattern to override with \returns 1 if successful, zero otherwise \details Patterns: \n 0 = off \n 1 = ok \n 2 = fault \n 3 = high \n 4 = medium \n 5 = low \n 6 = manual """ channel_id = 0x400 request_id = 0x8004 rst = self.__integer2ByteArray(reset) pat = self.__integer2ByteArray(pattern) cargo = rst + pat message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override alarm lamp pattern") # Send message received_message = self.__can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_pat = "reset back to normal" elif pattern == 0: str_pat = "off" elif pattern == 1: str_pat = "ok" elif pattern == 2: str_pat = "fault" elif pattern == 3: str_pat = "high" elif pattern == 4: str_pat = "medium" elif pattern == 5: str_pat = "low" else: str_pat = "manual" print("Alarm lamp pattern overridden to " + str_pat + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False class HD_BloodFlow: """ \class HD_BloodFlow \brief Hemodialysis Device (HD) Dialin API sub-class for blood-flow related commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance def BloodFlowSetPointOverride(self, reset, flow): """ sends the blood flow set point override command \param reset: integer - 1 to reset a previous override, 0 to override \param flow: integer - flow set point (in mL/min) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8008 rst = self.outer_instance.integer2ByteArray(reset) flo = self.outer_instance.integer2ByteArray(flow) cargo = rst + flo message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override blood flow set point") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(flow) print("Blood flow set point overridden to " + str_res + " mL/min: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def BloodFlowMeasuredOverride(self, reset, flow): """ sends the measured blood flow override command \param reset: integer - 1 to reset a previous override, 0 to override \param flow: integer - measured flow (in mL/min) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8009 rst = self.outer_instance.integer2ByteArray(reset) flo = self.outer_instance.integer2ByteArray(flow) cargo = rst + flo message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override measured blood flow") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(flow) print("Blood flow (measured)) overridden to " + str_res + " mL/min: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def BloodPumpMeasuredSpeedOverride(self, reset, speed): """ sends the measured blood pump speed override command \param reset: integer - 1 to reset a previous override, 0 to override \param speed: integer - speed (in RPM) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x800A rst = self.outer_instance.integer2ByteArray(reset) spd = self.outer_instance.integer2ByteArray(speed) cargo = rst + spd message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override measured blood pump speed") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(speed) print("Blood pump speed (measured) overridden to " + str_res + " RPM: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def BloodPumpMeasuredCurrentOverride(self, reset, curr): """ sends the measured blood pump motor current override command \param reset: integer - 1 to reset a previous override, 0 to override \param curr: integer - current (in mA) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x800B rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.integer2ByteArray(curr) cargo = rst + cur message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override measured blood pump motor current") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(curr) print("Blood pump current (measured) overridden to " + str_res+ " mA: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def BloodFlowBroadcastIntervalOverride(self, reset, ms): """ sends the measured flow broadcast interval override command \param reset: integer - 1 to reset a previous override, 0 to override \param ms: integer - interval (in ms) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x800C rst = self.outer_instance.integer2ByteArray(reset) mis = self.outer_instance.integer2ByteArray(ms) cargo = rst + mis message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override blood flow broadcast interval") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " print("Blood flow broadcast interval overridden to " + str_res + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def integer2ByteArray(self, val): fmt = '%08x' # integer to hex string formatter b = unhexlify(fmt % val) # convert reset int to byte array b = b[::-1] # little endian byte order return b def registerAsyncReceiver(self, message_id, method): t1 = method t2 = message_id def registerSyncReceiver(self, message_id, method): t1 = method t2 = message_id if __name__ == "__main__": # create an HD object called hd hd = HD() # wait 2 seconds and then login to HD as a tester sleep(2) hd._Basics.login() # wait 5 seconds and then override the blood flow data interval to every 2 seconds sleep(5) hd.BloodFlow.BloodFlowBroadcastIntervalOverride(0,2000) # hd.OffButtonOverride(0,1) # wait 5 seconds and then override the blood flow data interval to every 200 ms sleep(5) hd.BloodFlow.BloodFlowBroadcastIntervalOverride(0,200) # wait 5 seconds and then reset the blood flow data interval to normal sleep(5) hd.BloodFlow.BloodFlowBroadcastIntervalOverride(1,0)