########################################################################### # # Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file HemodialysisDevice.py # # @date 16-Oct-2019 # @author L. Baloa # # @brief This class allows sending to and receiving from the HD device. # ############################################################################ from DialityCoreCanProtocol import DialityCanMessenger from DialityCoreCanProtocol import DialityPacket from time import sleep from binascii import unhexlify class HD: """ \class HD \brief Hemodialysis Device (HD) Dialin API object. It provides the basic interface to communicate with the HD board """ def __init__(self, can__interface="can0"): """ HD constructor using can bus \param bus: can bus, e.g. "can0" \returns HD object provides test/service commands for the HD sub-system. \details For example: hd_object = HD(can__interface='can0') or hd_object = HD('can0') """ # Create listener self.can_interface = DialityCanMessenger(can_interface=can__interface) self.can_interface.start() # Create command groups self._Basics = HD.HD__Basics(self) self.Alarms = HD.HD_Alarms(self) self.Buttons = HD.HD_Buttons(self) self.BloodFlow = HD.HD_BloodFlow(self, self.can_interface) self.Watchdog = HD.HD_Watchdog(self) # Create constants self.NO_RESET = 0 self.RESET = 1 class HD__Basics: """ \class HD__Basics \brief Hemodialysis Device (HD) Dialin API sub-class for basic HD commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD__Basics constructor \param outer_instance: reference to the HD (outer) class. \returns HD__Basics object. """ def login(self): """ Constructs and sends a login command via CAN bus. Login required before \n other commands can be sent to the HD. \returns 1 if logged in, 0 if log in failed """ channel_id = 0x400 request_id = 0x8000 cargo = b'123' message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("login") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) if received_message is not None: print(received_message) print("Logged In: " + str(received_message[4])) return received_message[4] else: print("Timeout!!!!") return False def HDMessageInsert(self, msg): """ Constructs and sends the insert HD message command. Given message will \n be inserted into the HD's received message queue for processing. \param msg: byte array - properly formatted HD message to insert \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8001 cargo = msg message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("insert HD message") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) print("Inserted message: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False class HD_Buttons: """ \class HD_Buttons \brief Hemodialysis Device (HD) Dialin API sub-class for button related commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Buttons constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Buttons object. """ def OffButtonOverride(self, reset, state): """ Constructs and sends the Off button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override off button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8002 rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) cargo = rst + sta message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override off button") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Off button overridden to " + str_res + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def StopButtonOverride(self, reset, state): """ Constructs and sends the Stop button override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 to override stop button to "pressed", 0 to "released" \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8003 rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) cargo = rst + sta message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override stop button") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") print("Stop button overridden to " + str_res + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False class HD_Watchdog: """ \class HD_Watchdog \brief Hemodialysis Device (HD) Dialin API sub-class for watchdog related commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Watchdog constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Watchdog object. """ def WatchdogTaskCheckinOverride(self, reset, state, task): """ Constructs and sends the watchdog task check-in override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 for task checked in, 0 for task not checked in \param task: integer - ID of task to override \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8005 rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) tsk = self.outer_instance.integer2ByteArray(task) cargo = rst + sta + tsk message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override watchdog task check-in state") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = ("checked in" if state != 0 else "not checked in") print("Watchdog task check-in overridden to " + str_res + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False class HD_Alarms: """ \class HD_Alarms \brief Hemodialysis Device (HD) Dialin API sub-class for alarm related commands. """ def __init__(self, outer_instance): self.outer_instance = outer_instance """ HD_Alarms constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Alarms object. """ def AlarmStateOverride(self, reset, state, alarm): """ Constructs and sends the alarm state override command \param reset: integer - 1 to reset a previous override, 0 to override \param state: integer - 1 for alarm active, 0 for alarm inactive \param alarm: integer - ID of alarm to override \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8006 rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) alm = self.outer_instance.integer2ByteArray(alarm) cargo = rst + sta + alm message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override alarm state") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") print("Alarm state overridden to " + str_res + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def AlarmTimeOverride(self, reset, time_ms, alarm): """ Constructs and sends the alarm time override command \param reset: integer - 1 to reset a previous override, 0 to override \param time_ms: integer - time (in ms) since alarm was activated \param alarm: integer - ID of alarm to override \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8007 rst = self.outer_instance.integer2ByteArray(reset) ms = self.outer_instance.integer2ByteArray(time_ms) alm = self.outer_instance.integer2ByteArray(alarm) cargo = rst + ms + alm message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override alarm time since activated") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(time_ms) print("Alarm time since activated overridden to " + str_res + " ms: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def AlarmLampPatternOverride(self, reset, pattern): """ Constructs and sends the alarm lamp pattern override command. \param reset: integer - 1 to reset a previous override, 0 to override \param pattern: integer - ID of alarm lamp pattern to override with \returns 1 if successful, zero otherwise \details Patterns: \n 0 = off \n 1 = ok \n 2 = fault \n 3 = high \n 4 = medium \n 5 = low \n 6 = manual """ channel_id = 0x400 request_id = 0x8004 rst = self.__integer2ByteArray(reset) pat = self.__integer2ByteArray(pattern) cargo = rst + pat message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override alarm lamp pattern") # Send message received_message = self.__can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_pat = "reset back to normal" elif pattern == 0: str_pat = "off" elif pattern == 1: str_pat = "ok" elif pattern == 2: str_pat = "fault" elif pattern == 3: str_pat = "high" elif pattern == 4: str_pat = "medium" elif pattern == 5: str_pat = "low" else: str_pat = "manual" print("Alarm lamp pattern overridden to " + str_pat + ":" + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False class HD_BloodFlow: """ \class HD_BloodFlow \brief Hemodialysis Device (HD) Dialin API sub-class for blood-flow related commands. """ def __init__(self, outer_instance, can_interface=None): self.outer_instance = outer_instance if can_interface is not None: channel_id = 0x40 msg_id = 0x5 can_interface.registerSyncFunction( channel_id, msg_id, self.BloodFlowSyncFunction) self.TargetBloodFlowRate = 0 self.MeasuredBloodFlowRate = 0.0 self.MeasuredBloodPumpMCSpeed = 0.0 self.MeasuredBloodPumpCurrent = 0.0 """ HD_BloodFlow constructor \param outer_instance: reference to the HD (outer) class. \returns HD_BloodFlow object. """ def BloodFlowSyncFunction(self, message): self.TargetBloodFlowRate = int.from_bytes(bytearray(message[4:8]),byteorder='little') def BloodFlowSetPointOverride(self, reset, flow): """ Constructs and sends the blood flow set point override command \param reset: integer - 1 to reset a previous override, 0 to override \param flow: integer - flow set point (in mL/min) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8008 rst = self.outer_instance.integer2ByteArray(reset) flo = self.outer_instance.integer2ByteArray(flow) cargo = rst + flo message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override blood flow set point") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(flow) print("Blood flow set point overridden to " + str_res + " mL/min: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def BloodFlowMeasuredOverride(self, reset, flow): """ Constructs and sends the measured blood flow override command \param reset: integer - 1 to reset a previous override, 0 to override \param flow: integer - measured flow (in mL/min) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x8009 rst = self.outer_instance.integer2ByteArray(reset) flo = self.outer_instance.integer2ByteArray(flow) cargo = rst + flo message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override measured blood flow") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(flow) print("Blood flow (measured)) overridden to " + str_res + " mL/min: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False # TODO - add override command for blood pump motor speed (motor hall sensor via FPGA) # TODO - add override command for blood pump speed (pump hall sensor via NHET?) def BloodPumpMCMeasuredSpeedOverride(self, reset, speed): """ Constructs and sends the measured blood pump motor controller speed \n override command. \param reset: integer - 1 to reset a previous override, 0 to override \param speed: integer - speed (in RPM) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x800A rst = self.outer_instance.integer2ByteArray(reset) spd = self.outer_instance.integer2ByteArray(speed) cargo = rst + spd message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override measured blood pump speed") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(speed) print("Blood pump speed (measured) overridden to " + str_res + " RPM: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def BloodPumpMeasuredCurrentOverride(self, reset, curr): """ Constructs and sends the measured blood pump motor current override command \param reset: integer - 1 to reset a previous override, 0 to override \param curr: integer - current (in mA) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x800B rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.integer2ByteArray(curr) cargo = rst + cur message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override measured blood pump motor current") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal" else: str_res = str(curr) print("Blood pump current (measured) overridden to " + str_res+ " mA: " + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def BloodFlowBroadcastIntervalOverride(self, reset, ms): """ Constructs and sends the measured flow broadcast interval override command \param reset: integer - 1 to reset a previous override, 0 to override \param ms: integer - interval (in ms) to override with \returns 1 if successful, zero otherwise """ channel_id = 0x400 request_id = 0x800C rst = self.outer_instance.integer2ByteArray(reset) mis = self.outer_instance.integer2ByteArray(ms) cargo = rst + mis message = DialityPacket.buildPacket(request_id=request_id, cargo=cargo) print("override blood flow broadcast interval") # Send message received_message = self.outer_instance.can_interface.send(channel_id, message) # If there is content... if received_message is not None: print(received_message) if reset == 1: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " print("Blood flow broadcast interval overridden to " + str_res + str(received_message[4])) # 5th element is OK or not OK return received_message[4] else: print("Timeout!!!!") return False def integer2ByteArray(self, val): fmt = '%08x' # integer to hex string formatter b = unhexlify(fmt % val) # convert reset int to byte array b = b[::-1] # little endian byte order return b def registerAsyncReceiver(self, message_id, method): t1 = method t2 = message_id def registerSyncReceiver(self, message_id, method): t1 = method t2 = message_id if __name__ == "__main__": # create an HD object called hd hd = HD() # wait 2 seconds and then login to HD as a tester sleep(2) hd._Basics.login()