########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file DialysateOutletFlow.py # # @date 31-Mar-2020 # @author P. Lucia # # @brief # # ############################################################################ from DialIn.CoreCANProtocol import (DenaliMessage, DenaliChannels) from .utils import integer2ByteArray, float2ByteArray from .HemodialysisDevice import HD import struct class HDDialysateOutletFlow: """ \class HD_DialysateOutletFlow \brief Hemodialysis Device (HD) Dialin API sub-class for dialysate outlet pump related commands. """ # DialysateFlow message IDs MSG_ID_HD_DIAL_OUT_FLOW_PUBLISHED_DATA = 0x000B MSG_ID_HD_LOAD_CELL_READINGS = 0x000C MSG_ID_HD_DIAL_OUT_FLOW_PUBLISH_INTERVAL_OVERRIDE = 0x801E MSG_ID_HD_DIAL_OUT_UF_REF_VOLUME_OVERRIDE = 0x801F MSG_ID_HD_DIAL_OUT_UF_MEAS_VOLUME_OVERRIDE = 0x8020 MSG_ID_HD_DIAL_OUT_PUMP_MC_MEAS_SPEED_OVERRIDE = 0x8021 MSG_ID_HD_DIAL_OUT_PUMP_MC_MEAS_CURRENT_OVERRIDE = 0x8022 MSG_ID_HD_DIAL_OUT_PUMP_MEAS_SPEED_OVERRIDE = 0x8023 MSG_ID_HD_DIAL_OUT_PUMP_ROTOR_MEAS_SPEED_OVERRIDE = 0x8024 MSG_ID_HD_DIAL_OUT_LOAD_CELL_WEIGHT_OVERRIDE = 0x8025 # BloodFlow broadcast message field positions START_POS_REF_VOL = DenaliMessage.PAYLOAD_START_INDEX END_POS_REF_VOL = START_POS_REF_VOL + 4 START_POS_MEAS_VOL = END_POS_REF_VOL END_POS_MEAS_VOL = START_POS_MEAS_VOL + 4 START_POS_MEAS_ROT_SPEED = END_POS_MEAS_VOL END_POS_MEAS_ROT_SPEED = START_POS_MEAS_ROT_SPEED + 4 START_POS_MEAS_SPEED = END_POS_MEAS_ROT_SPEED END_POS_MEAS_SPEED = START_POS_MEAS_SPEED + 4 START_POS_MEAS_MC_SPEED = END_POS_MEAS_SPEED END_POS_MEAS_MC_SPEED = START_POS_MEAS_MC_SPEED + 4 START_POS_MEAS_MC_CURR = END_POS_MEAS_MC_SPEED END_POS_MEAS_MC_CURR = START_POS_MEAS_MC_CURR + 4 START_POS_PWM_DC = END_POS_MEAS_MC_CURR END_POS_PWM_DC = START_POS_PWM_DC + 4 def __init__(self, can_interface=None): """ HDDialysateFlow constructor """ self.can_interface = can_interface if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = self.MSG_ID_HD_DIAL_OUT_FLOW_PUBLISHED_DATA self.can_interface.register_receiving_publication_function(channel_id, msg_id, self.handlerDialysateOutletFlowSyncFunction) self.ReferenceDialysateOutletUFVolume = 0.0 self.MeasuredDialysateOutletUFVolume = 0.0 self.MeasuredDialysateOutletPumpRotorSpeed = 0.0 self.MeasuredDialysateOutletPumpSpeed = 0.0 self.MeasuredDialysateOutletPumpMCSpeed = 0.0 self.MeasuredDialysateOutletPumpMCCurrent = 0.0 self.PWMDutyCyclePct = 0.0 def handlerDialysateOutletFlowSyncFunction(self, message): """ Handles published dialysate outlet flow data messages. Dialysate flow data are captured for reference. \param message: published dialysate outlet flow data message \returns none """ refVol = struct.unpack('f', bytearray( message['message'][self.START_POS_REF_VOL:self.END_POS_REF_VOL])) measVol = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_VOL:self.END_POS_MEAS_VOL])) rotor = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_ROT_SPEED:self.END_POS_MEAS_ROT_SPEED])) speed = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_SPEED:self.END_POS_MEAS_SPEED])) mcspeed = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_MC_SPEED:self.END_POS_MEAS_MC_SPEED])) mccurr = struct.unpack('f', bytearray( message['message'][self.START_POS_MEAS_MC_CURR:self.END_POS_MEAS_MC_CURR])) pwm = struct.unpack('f', bytearray( message['message'][self.START_POS_PWM_DC:self.END_POS_PWM_DC])) self.ReferenceDialysateOutletUFVolume = refVol[0] self.MeasuredDialysateOutletUFVolume = measVol[0] self.MeasuredDialysateOutletPumpRotorSpeed = rotor[0] self.MeasuredDialysateOutletPumpSpeed = speed[0] self.MeasuredDialysateOutletPumpMCSpeed = mcspeed[0] self.MeasuredDialysateOutletPumpMCCurrent = mccurr[0] self.PWMDutyCyclePct = pwm[0] def CmdDialysateOutletReferenceUFVolumeOverride(self, reset, refVol): """ Constructs and sends the UF reference volume override command \param reset: integer - 1 to reset a previous override, 0 to override \param refVol: float - reference UF volume (in mL) to override with \returns 1 if successful, zero otherwise """ rst = integer2ByteArray(reset) vol = float2ByteArray(refVol) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_DIAL_OUT_UF_REF_VOLUME_OVERRIDE, payload=payload) print("override UF reference volume with " + str(refVol) + "mL.") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal. " else: str_res = "overridden to " + str(refVol) + " mL. " print( "UF reference volume " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdDialysateOutletMeasuredUFVolumeOverride(self, reset, measVol): """ Constructs and sends the measured UF volume override command \param reset: integer - 1 to reset a previous override, 0 to override \param measVol: float - measured UF volume (in mL) to override with \returns 1 if successful, zero otherwise """ rst = integer2ByteArray(reset) vol = float2ByteArray(measVol) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_DIAL_OUT_UF_MEAS_VOLUME_OVERRIDE, payload=payload) print("override measured UF volume with " + str(measVol) + " mL.") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal. " else: str_res = "overridden to " + str(measVol) + " mL. " print("UF measured volume " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdDialysateOutletPumpMCMeasuredSpeedOverride(self, reset, speed): """ Constructs and sends the measured dialysate outlet pump motor controller speed \n override command. \param reset: integer - 1 to reset a previous override, 0 to override \param speed: float - speed (in RPM) to override with \returns 1 if successful, zero otherwise """ rst = integer2ByteArray(reset) spd = float2ByteArray(speed) payload = rst + spd message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MeasuredDialysateOutletPumpMCSpeed, payload=payload) print("override measured dialysate outlet pump motor controller speed to " + str(spd) + " RPM.") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal. " else: str_res = "overridden to " + str(speed) + " RPM. " print("Dialysate outlet pump MC speed (measured) " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdDialysateOutletPumpMeasuredCurrentOverride(self, reset, curr): """ Constructs and sends the measured dialysate outlet pump motor current override command \param reset: integer - 1 to reset a previous override, 0 to override \param curr: float - current (in mA) to override with \returns 1 if successful, zero otherwise """ rst = integer2ByteArray(reset) cur = float2ByteArray(curr) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_DIAL_OUT_PUMP_MC_MEAS_CURRENT_OVERRIDE, payload=payload) print("override measured dialysate outlet pump motor controller current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal. " else: str_res = "overridden to " + str(curr) + " mA. " print("Dialysate outlet pump MC current (measured) " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdDialysateOutletPumpMeasuredSpeedOverride(self, reset, speed): """ Constructs and sends the measured dialysate outlet pump motor speed override \n command. \param reset: integer - 1 to reset a previous override, 0 to override \param speed: float - speed (in RPM) to override with \returns 1 if successful, zero otherwise """ rst = integer2ByteArray(reset) spd = float2ByteArray(speed) payload = rst + spd message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_DIAL_OUT_PUMP_MEAS_SPEED_OVERRIDE, payload=payload) print("override measured dialysate outlet pump speed to " + str(speed) + " RPM.") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal. " else: str_res = "overridden to " + str(speed) + " RPM. " print("Dialysate outlet pump speed (measured) " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdDialysateOutletPumpRotorMeasuredSpeedOverride(self, reset, speed): """ Constructs and sends the measured dialysate outlet pump rotor speed override \n command. \param reset: integer - 1 to reset a previous override, 0 to override \param speed: float - speed (in RPM) to override with \returns 1 if successful, zero otherwise """ rst = integer2ByteArray(reset) spd = float2ByteArray(speed) payload = rst + spd message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_DIAL_OUT_PUMP_ROTOR_MEAS_SPEED_OVERRIDE, payload=payload) print("override measured dialysate outlet pump rotor speed to " + str(speed) + " RPM.") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal. " else: str_res = "overridden to " + str(speed) + " RPM. " print("Dialysate outlet pump rotor speed (measured) " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdDialysateOutletFlowBroadcastIntervalOverride(self, reset, ms): """ Constructs and sends the measured dialysate outlet flow broadcast interval override command \param reset: integer - 1 to reset a previous override, 0 to override \param ms: integer - interval (in ms) to override with \returns 1 if successful, zero otherwise """ rst = integer2ByteArray(reset) mis = integer2ByteArray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_DIAL_OUT_FLOW_PUBLISH_INTERVAL_OVERRIDE, payload=payload) print("override dialysate outlet flow broadcast interval to " + str(ms) + " ms.") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal. " else: str_res = "overridden to " + str(ms) + " ms. " print("Dialysate outlet flow broadcast interval " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdDialysateOutletPumpLoadCellWeightOverride(self, reset, weight, sensor): """ Constructs and sends the measured load cell weight override command. \param reset: integer - 1 to reset a previous override, 0 to override \param weight: float - weight (in g) to override with \param sensor: integer - ID of load cell to override \returns 1 if successful, zero otherwise \details Load Cells: \n 0 = reservoir 1 primary \n 1 = reservoir 1 backup \n 2 = reservoir 2 primary \n 3 = reservoir 2 backup \n """ rst = integer2ByteArray(reset) spd = float2ByteArray(weight) sen = integer2ByteArray(sensor) payload = rst + spd + sen message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_DIAL_OUT_LOAD_CELL_WEIGHT_OVERRIDE, payload=payload) print("override measured load cell weight to " + str(weight) + " grams for load cell # " + str(sensor)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # print(received_message) if reset == HD.RESET: str_res = "reset back to normal. " else: str_res = "overridden to " + str(weight) + " grams. " print("Load cell # " + str(sensor) + " " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def CmdSetLoadCellWeights(self, r1p, r1b, r2p, r2b): """ Constructs and sends the set load cell weights command. \param r1p: float - weight (in g) for reservoir 1 primary load cell \param r1b: float - weight (in g) for reservoir 1 backup load cell \param r2p: float - weight (in g) for reservoir 2 primary load cell \param r2b: float - weight (in g) for reservoir 2 backup load cell \returns 0 - no response will come from HD for this message """ r1pb = float2ByteArray(r1p) r1bb = float2ByteArray(r1b) r2pb = float2ByteArray(r2p) r2bb = float2ByteArray(r2b) payload = r1pb + r1bb + r2pb + r2bb message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_LOAD_CELL_READINGS, payload=payload) print("measured load cell weights set.") # Send message received_message = self.can_interface.send(message, 0) return 0