Fisheye: Tag 29be612221aa2f9f2fbd08a1287abb60d416fc33 refers to a dead (removed) revision in file `HD_TestScript.py'. Fisheye: No comparison available. Pass `N' to diff? Index: HemodialysisDevice.py =================================================================== diff -u -rce9a0779ba86d4ecb98fa025fa959cc8e8ecb814 -r29be612221aa2f9f2fbd08a1287abb60d416fc33 --- HemodialysisDevice.py (.../HemodialysisDevice.py) (revision ce9a0779ba86d4ecb98fa025fa959cc8e8ecb814) +++ HemodialysisDevice.py (.../HemodialysisDevice.py) (revision 29be612221aa2f9f2fbd08a1287abb60d416fc33) @@ -20,6 +20,7 @@ from binascii import unhexlify import struct + class HD: """ \class HD @@ -64,6 +65,7 @@ def __init__(self, outer_instance): self.outer_instance = outer_instance + """ HD__Basics constructor @@ -72,37 +74,31 @@ \returns HD__Basics object. """ - - def CmdLogInToHD(self): """ - Constructs and sends a login command via CAN bus. Login required before \n - other commands can be sent to the HD. + Constructs and sends a login command via CAN bus. Login required before \n + other commands can be sent to the HD. - \returns 1 if logged in, 0 if log in failed + \returns 1 if logged in, 0 if log in failed """ - channel_id = 0x400 - request_id = 0x8000 - cargo = b'123' + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8000, + payload=list(map(int, map(ord, '123')))) - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) - print("login") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) if received_message is not None: print(received_message) - print("Logged In: " + str(received_message[4])) - return received_message[4] + print("Logged In: " + str(received_message['message'][4])) + return received_message['message'][4] else: print("Timeout!!!!") return False - - def CmdHDMessageInsert(self, msg): """ Constructs and sends the insert HD message command. Given message will \n @@ -113,30 +109,25 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x8001 - cargo = msg + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8001, + payload=msg) - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) - print("insert HD message") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: print(received_message) - print("Inserted message: " + str(received_message[4])) + print("Inserted message: " + str(received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - - class HD_Buttons: """ \class HD_Buttons @@ -146,6 +137,7 @@ def __init__(self, outer_instance): self.outer_instance = outer_instance + """ HD_Buttons constructor @@ -154,8 +146,6 @@ \returns HD_Buttons object. """ - - def CmdOffButtonOverride(self, reset, state): """ Constructs and sends the Off button override command @@ -165,19 +155,18 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x8002 - rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) cargo = rst + sta - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8002, + payload=cargo) print("override off button") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -186,15 +175,14 @@ str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") - print("Off button overridden to " + str_res + ":" + str(received_message[4])) + + print("Off button overridden to " + str_res + ":" + str(received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - def CmdStopButtonOverride(self, reset, state): """ Constructs and sends the Stop button override command @@ -204,19 +192,18 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x8003 - rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) cargo = rst + sta - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8003, + payload=cargo) print("override stop button") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -225,15 +212,13 @@ str_res = "reset back to normal" else: str_res = ("pressed" if state != 0 else "released") - print("Stop button overridden to " + str_res + ":" + str(received_message[4])) + print("Stop button overridden to " + str_res + ":" + str(received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message["message"][4] else: print("Timeout!!!!") return False - - class HD_Watchdog: """ \class HD_Watchdog @@ -243,6 +228,7 @@ def __init__(self, outer_instance): self.outer_instance = outer_instance + """ HD_Watchdog constructor @@ -251,7 +237,6 @@ \returns HD_Watchdog object. """ - def CmdWatchdogTaskCheckinOverride(self, reset, state, task): """ Constructs and sends the watchdog task check-in override command @@ -262,20 +247,19 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x8005 - rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) tsk = self.outer_instance.integer2ByteArray(task) cargo = rst + sta + tsk - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8005, + payload=cargo) print("override watchdog task check-in state") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -284,9 +268,9 @@ str_res = "reset back to normal" else: str_res = ("checked in" if state != 0 else "not checked in") - print("Watchdog task check-in overridden to " + str_res + ":" + str(received_message[4])) + print("Watchdog task check-in overridden to " + str_res + ":" + str(received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False @@ -300,6 +284,7 @@ def __init__(self, outer_instance): self.outer_instance = outer_instance + """ HD_Alarms constructor @@ -308,7 +293,6 @@ \returns HD_Alarms object. """ - def CmdAlarmStateOverride(self, reset, state, alarm): """ Constructs and sends the alarm state override command @@ -319,20 +303,19 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x8006 - rst = self.outer_instance.integer2ByteArray(reset) sta = self.outer_instance.integer2ByteArray(state) alm = self.outer_instance.integer2ByteArray(alarm) cargo = rst + sta + alm - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8006, + payload=cargo) print("override alarm state") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -341,15 +324,13 @@ str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") - print("Alarm state overridden to " + str_res + ":" + str(received_message[4])) + print("Alarm state overridden to " + str_res + ":" + str(received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - def CmdAlarmTimeOverride(self, reset, time_ms, alarm): """ Constructs and sends the alarm time override command @@ -360,20 +341,19 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x8007 - rst = self.outer_instance.integer2ByteArray(reset) ms = self.outer_instance.integer2ByteArray(time_ms) alm = self.outer_instance.integer2ByteArray(alarm) cargo = rst + ms + alm - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8007, + payload=cargo) print("override alarm time since activated") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -382,15 +362,14 @@ str_res = "reset back to normal" else: str_res = str(time_ms) - print("Alarm time since activated overridden to " + str_res + " ms: " + str(received_message[4])) + print("Alarm time since activated overridden to " + str_res + " ms: " + str( + received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - def CmdAlarmLampPatternOverride(self, reset, pattern): """ Constructs and sends the alarm lamp pattern override command. @@ -408,20 +387,18 @@ 5 = low \n 6 = manual """ - - channel_id = 0x400 - request_id = 0x8004 - rst = self.outer_instance.integer2ByteArray(reset) pat = self.outer_instance.integer2ByteArray(pattern) cargo = rst + pat - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8004, + payload=cargo) print("override alarm lamp pattern") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -442,15 +419,13 @@ str_pat = "low" else: str_pat = "manual" - print("Alarm lamp pattern overridden to " + str_pat + ":" + str(received_message[4])) + print("Alarm lamp pattern overridden to " + str_pat + ":" + str(received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - class HD_BloodFlow: """ \class HD_BloodFlow @@ -465,14 +440,14 @@ if can_interface is not None: channel_id = 0x40 msg_id = 0x0005 - can_interface.registerReceivingPublicationFunction(channel_id, msg_id, self.handlerBloodFlowSyncFunction) + can_interface.registerReceivingPublicationFunction(channel_id, msg_id, + self.handlerBloodFlowSyncFunction) self.TargetBloodFlowRate = 0 self.MeasuredBloodFlowRate = 0.0 self.MeasuredBloodPumpMCSpeed = 0.0 self.MeasuredBloodPumpCurrent = 0.0 - """ HD_BloodFlow constructor @@ -482,13 +457,12 @@ """ def handlerBloodFlowSyncFunction(self, message): - self.TargetBloodFlowRate = int.from_bytes(bytearray(message[4:8]),byteorder='little') - self.MeasuredBloodFlowRate = struct.unpack('f', bytearray(message[8:12])) - self.MeasuredBloodPumpMCSpeed = struct.unpack('f', bytearray(message[12:16])) - self.MeasuredBloodPumpCurrent = struct.unpack('f', bytearray(message[16:20])) - #print("Tgt Flow: " + str(self.TargetBloodFlowRate) + " Meas Flow: " + str(self.MeasuredBloodFlowRate) + " Meas Spd: " + str(self.MeasuredBloodPumpMCSpeed) + " Meas Curr: " + str(self.MeasuredBloodPumpCurrent)) + self.TargetBloodFlowRate = int.from_bytes(bytearray(message['message'][4:8]), byteorder='little') + self.MeasuredBloodFlowRate = struct.unpack('f', bytearray(message['message'][8:12])) + self.MeasuredBloodPumpMCSpeed = struct.unpack('f', bytearray(message['message'][12:16])) + self.MeasuredBloodPumpCurrent = struct.unpack('f', bytearray(message['message'][16:20])) + # print("Tgt Flow: " + str(self.TargetBloodFlowRate) + " Meas Flow: " + str(self.MeasuredBloodFlowRate) + " Meas Spd: " + str(self.MeasuredBloodPumpMCSpeed) + " Meas Curr: " + str(self.MeasuredBloodPumpCurrent)) - def CmdBloodFlowSetPointOverride(self, reset, flow): """ Constructs and sends the blood flow set point override command @@ -498,19 +472,18 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x8008 - rst = self.outer_instance.integer2ByteArray(reset) flo = self.outer_instance.integer2ByteArray(flow) cargo = rst + flo - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8008, + payload=cargo) print("override blood flow set point") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -519,15 +492,14 @@ str_res = "reset back to normal" else: str_res = str(flow) - print("Blood flow set point overridden to " + str_res + " mL/min: " + str(received_message[4])) + print( + "Blood flow set point overridden to " + str_res + " mL/min: " + str(received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - def CmdBloodFlowMeasuredOverride(self, reset, flow): """ Constructs and sends the measured blood flow override command @@ -537,19 +509,18 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x8009 - rst = self.outer_instance.integer2ByteArray(reset) flo = self.outer_instance.integer2ByteArray(flow) cargo = rst + flo - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x8009, + payload=cargo) print("override measured blood flow") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -558,18 +529,17 @@ str_res = "reset back to normal" else: str_res = str(flow) - print("Blood flow (measured)) overridden to " + str_res + " mL/min: " + str(received_message[4])) + print("Blood flow (measured)) overridden to " + str_res + " mL/min: " + str( + received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - # TODO - add override command for blood pump motor speed (motor hall sensor via FPGA) # TODO - add override command for blood pump speed (pump hall sensor via NHET?) - def CmdBloodPumpMCMeasuredSpeedOverride(self, reset, speed): """ Constructs and sends the measured blood pump motor controller speed \n @@ -580,19 +550,18 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x800A - rst = self.outer_instance.integer2ByteArray(reset) spd = self.outer_instance.integer2ByteArray(speed) cargo = rst + spd - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x800A, + payload=cargo) print("override measured blood pump speed") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -601,15 +570,14 @@ str_res = "reset back to normal" else: str_res = str(speed) - print("Blood pump speed (measured) overridden to " + str_res + " RPM: " + str(received_message[4])) + print("Blood pump speed (measured) overridden to " + str_res + " RPM: " + str( + received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - def CmdBloodPumpMeasuredCurrentOverride(self, reset, curr): """ Constructs and sends the measured blood pump motor current override command @@ -619,19 +587,18 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x800B - rst = self.outer_instance.integer2ByteArray(reset) cur = self.outer_instance.integer2ByteArray(curr) cargo = rst + cur - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x800B, + payload=cargo) print("override measured blood pump motor current") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -640,15 +607,14 @@ str_res = "reset back to normal" else: str_res = str(curr) - print("Blood pump current (measured) overridden to " + str_res+ " mA: " + str(received_message[4])) + print("Blood pump current (measured) overridden to " + str_res + " mA: " + str( + received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - def CmdBloodFlowBroadcastIntervalOverride(self, reset, ms): """ Constructs and sends the measured flow broadcast interval override command @@ -658,19 +624,18 @@ \returns 1 if successful, zero otherwise """ - channel_id = 0x400 - request_id = 0x800C - rst = self.outer_instance.integer2ByteArray(reset) mis = self.outer_instance.integer2ByteArray(ms) cargo = rst + mis - message = DenaliMessage.buildMessage(message_id=request_id, payload=cargo) + message = DenaliMessage.buildMessage(channel_id=0x400, + message_id=0x800C, + payload=cargo) print("override blood flow broadcast interval") # Send message - received_message = self.outer_instance.can_interface.send(channel_id, message) + received_message = self.outer_instance.can_interface.send(message) # If there is content... if received_message is not None: @@ -679,23 +644,19 @@ str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " - print("Blood flow broadcast interval overridden to " + str_res + str(received_message[4])) + print("Blood flow broadcast interval overridden to " + str_res + str(received_message['message'][4])) # 5th element is OK or not OK - return received_message[4] + return received_message['message'][4] else: print("Timeout!!!!") return False - - def integer2ByteArray(self, val): - fmt = '%08x' # integer to hex string formatter - b = unhexlify(fmt % val) # convert reset int to byte array - b = b[::-1] # little endian byte order + fmt = '%08x' # integer to hex string formatter + b = unhexlify(fmt % val) # convert reset int to byte array + b = b[::-1] # little endian byte order return b - - def registerAsyncReceiver(self, message_id, method): t1 = method t2 = message_id @@ -726,4 +687,4 @@ hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 200) tgtRate = hd.BloodFlow.TargetBloodFlowRate -# hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.RESET,0) \ No newline at end of file +# hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.RESET,0) Index: Miscellaneous/HD_TestScript.py =================================================================== diff -u --- Miscellaneous/HD_TestScript.py (revision 0) +++ Miscellaneous/HD_TestScript.py (revision 29be612221aa2f9f2fbd08a1287abb60d416fc33) @@ -0,0 +1,44 @@ +########################################################################### +# +# Copyright (c) 2019-2019 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file HD_TestScript.py +# +# @date 19-Nov-2019 +# @author S. Nash +# +# @brief This is an example test script for the HD. +# +############################################################################ + +from DialityCoreCanProtocol import DenaliCanMessenger +from DialityCoreCanProtocol import DenaliMessage +from HemodialysisDevice import HD +from time import sleep + +if __name__ == "__main__": + # create an HD object called hd + hd = HD() + + # wait 2 seconds and then login to HD as a tester + sleep(2) + hd._Basics.CmdLogInToHD() + + tgtRate = 0 + hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) + + while True: + if hd.BloodFlow.TargetBloodFlowRate == 0: + if tgtRate != 0: + hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 2000) + tgtRate = 0 + else: + if tgtRate == 0: + hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.NO_RESET, 200) + tgtRate = hd.BloodFlow.TargetBloodFlowRate + +# hd.BloodFlow.CmdBloodFlowBroadcastIntervalOverride(hd.RESET,0) +