########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file PressureOcclusion.py # # @date 31-Mar-2020 # @author P. Lucia # # @brief # # ############################################################################ from DialIn.CoreCANProtocol import (DenaliMessage, DenaliChannels) from utils import integer_to_bytearray, float_to_bytearray, RESET import struct class HDPressureOcclusion: """ \class HDPressureOcclusion \brief Hemodialysis Device (HD) Dialin API sub-class for pressure related commands. """ # Pressure/Occlusion message IDs MSG_ID_HD_PRESSURE_OCCLUSION_DATA = 0x0009 MSG_ID_HD_PRESSURE_ARTERIAL_OVERRIDE = 0x8017 MSG_ID_HD_PRESSURE_VENOUS_OVERRIDE = 0x8018 MSG_ID_HD_OCCLUSION_BLOOD_PUMP_OVERRIDE = 0x8019 MSG_ID_HD_OCCLUSION_DIAL_IN_PUMP_OVERRIDE = 0x801A MSG_ID_HD_OCCLUSION_DIAL_OUT_PUMP_OVERRIDE = 0x801B MSG_ID_HD_PRES_OCCL_SEND_INTERVAL_OVERRIDE = 0x801C # Pressure/Occlusion broadcast message field positions START_POS_ART_PRES = DenaliMessage.PAYLOAD_START_INDEX END_POS_ART_PRES = START_POS_ART_PRES + 4 START_POS_VEN_PRES = END_POS_ART_PRES END_POS_VEN_PRES = START_POS_VEN_PRES + 4 START_POS_BP_OCCL = END_POS_VEN_PRES END_POS_BP_OCCL = START_POS_BP_OCCL + 4 START_POS_DIP_OCCL = END_POS_BP_OCCL END_POS_DIP_OCCL = START_POS_DIP_OCCL + 4 START_POS_DOP_OCCL = END_POS_DIP_OCCL END_POS_DOP_OCCL = START_POS_DOP_OCCL + 4 def __init__(self, can_interface=None): """ HDPressureOcclusion constructor \param outer_instance: reference to the HD (outer) class. \returns HDPressureOcclusion object. """ self.can_interface = can_interface if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = self.MSG_ID_HD_PRESSURE_OCCLUSION_DATA self.can_interface.register_receiving_publication_function(channel_id, msg_id, self.handler_pressure_occlusion_sync) self.arterialPressure = 0 self.venousPressure = 0.0 self.bloodPumpOcclusion = 0.0 self.dialysateInletPumpOcclusion = 0.0 self.dialysateOutletPumpOcclusion = 0.0 def handler_pressure_occlusion_sync(self, message): """ Handles published pressure & occlusion data messages. Pressure data are captured for reference. \param message: published pressure & occlusion data message \returns none """ art = struct.unpack('f', bytearray( message['message'][self.START_POS_ART_PRES:self.END_POS_ART_PRES])) ven = struct.unpack('f', bytearray( message['message'][self.START_POS_VEN_PRES:self.END_POS_VEN_PRES])) bp = struct.unpack('f', bytearray( message['message'][self.START_POS_BP_OCCL:self.END_POS_BP_OCCL])) dpi = struct.unpack('f', bytearray( message['message'][self.START_POS_DIP_OCCL:self.END_POS_DIP_OCCL])) dpo = struct.unpack('f', bytearray( message['message'][self.START_POS_DOP_OCCL:self.END_POS_DOP_OCCL])) self.arterialPressure = art[0] self.venousPressure = ven[0] self.bloodPumpOcclusion = bp[0] self.dialysateInletPumpOcclusion = dpi[0] self.dialysateOutletPumpOcclusion = dpo[0] def cmd_arterial_pressure_measured_override(self, reset, pres): """ Constructs and sends the measured arterial pressure override command \param reset: integer - 1 to reset a previous override, 0 to override \param pres: float - measured arterial pressure (in mmHg) to override with \returns 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) prs = float_to_bytearray(pres) payload = rst + prs message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_PRESSURE_ARTERIAL_OVERRIDE, payload=payload) print("override measured arterial pressure") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal. " else: str_res = str(pres) + " mmHg. " print("Arterial pressure (measured)) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_venous_pressure_measured_override(self, reset, pres): """ Constructs and sends the measured venous pressure \n override command. \param reset: integer - 1 to reset a previous override, 0 to override \param pres: float - venous pressure (in mmHg) to override with \returns 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) prs = float_to_bytearray(pres) payload = rst + prs message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_PRESSURE_VENOUS_OVERRIDE, payload=payload) print("override measured venous pressure") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal. " else: str_res = str(pres) + " mmHg. " print("Venous pressure (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_blood_pump_measured_occlusion_override(self, reset, occl): """ Constructs and sends the measured blood pump occlusion pressure override command \param reset: integer - 1 to reset a previous override, 0 to override \param occl: float - pressure (in mmHg) to override with \returns 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) occ = float_to_bytearray(occl) payload = rst + occ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_OCCLUSION_BLOOD_PUMP_OVERRIDE, payload=payload) print("override measured blood pump occlusion pressure") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal. " else: str_res = str(occl) + " mmHg. " print("Blood pump occlusion pressure (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_dialysate_inlet_pump_measured_occlusion_override(self, reset, occl): """ Constructs and sends the measured dialysate inlet pump occlusion pressure override \n command. \param reset: integer - 1 to reset a previous override, 0 to override \param occl: float - pressure (in mmHg) to override with \returns 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) occ = float_to_bytearray(occl) payload = rst + occ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_OCCLUSION_DIAL_IN_PUMP_OVERRIDE, payload=payload) print("override measured dialysate inlet pump occlusion pressure") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal. " else: str_res = str(occl) + " mmHg. " print("Dialysate inlet pump occlusion pressure (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_dialysate_outlet_pump_measured_occlusion_override(self, reset, occl): """ Constructs and sends the measured dialysate outlet pump occlusion pressure override \n command. \param reset: integer - 1 to reset a previous override, 0 to override \param occl: float - pressure (in mmHg) to override with \returns 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) occ = float_to_bytearray(occl) payload = rst + occ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_OCCLUSION_DIAL_OUT_PUMP_OVERRIDE, payload=payload) print("override measured dialysate outlet pump occlusion pressure") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal. " else: str_res = str(occl) + " mmHg. " print("Dialysate outlet pump occlusion pressure (measured) overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_pressure_occlusion_broadcast_interval_override(self, reset, ms): """ Constructs and sends the pressure/occlusion broadcast interval override command \param reset: integer - 1 to reset a previous override, 0 to override \param ms: integer - interval (in ms) to override with \returns 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=self.MSG_ID_HD_PRES_OCCL_SEND_INTERVAL_OVERRIDE, payload=payload) print("override pressure/occlusion broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " print("Pressure/occlusion broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False