########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_proxy.py # # @date 14-Apr-2020 # @author S. Nash # # @brief # # ############################################################################ from ..utils.conversions import integer_to_bytearray from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish class DGHDProxy(_AbstractSubSystem): """ \class DGHDProxy \brief Dialysate Generator (DG) Dialin API sub-class for HD proxy commands. """ # Pressure/Occlusion message IDs MSG_ID_HD_SWITCH_RESERVOIRS_CMD = 0x0021 MSG_ID_HD_FILL_CMD = 0x0022 MSG_ID_HD_DRAIN_CMD = 0x0023 MSG_ID_HD_START_STOP_DG_CMD = 0x0026 MSG_ID_HD_START_STOP_DG_TRIMMER_HEATER = 0x002B # Reservoir IDs RESERVOIR1 = 0 RESERVOIR2 = 1 def __init__(self, can_interface=None): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface def cmd_switch_reservoirs(self, reservoirID=RESERVOIR1): """ Constructs and sends the switch reservoirs command @param res: unsigned int - reservoir to set as active (HD will draw from this reservoir). @return: 1 if successful, zero otherwise \details Reservoir IDs: \n 0 = RESERVOIR 1 \n 1 = RESERVOIR 2 \n """ res = integer_to_bytearray(reservoirID) payload = res message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_SWITCH_RESERVOIRS_CMD, payload=payload) print("switch reservoirs cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_fill(self, volume=1500): """ Constructs and sends the fill command @param volume: unsigned int - volume (in mL) to fill inactive reservoir to. @return: 1 if successful, zero otherwise """ vol = integer_to_bytearray(volume) payload = vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_FILL_CMD, payload=payload) print("fill cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_drain(self, volume=200): """ Constructs and sends the drain command @param volume: unsigned int - volume (in mL) to drain the inactive reservoir to. @return: 1 if successful, zero otherwise """ vol = integer_to_bytearray(volume) payload = vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_DRAIN_CMD, payload=payload) print("drain cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_start_stop_dg(self, start=True): """ Constructs and sends the start/stop DG command @param start: boolean - True = start DG, False = stop DG. @return: 1 if successful, zero otherwise """ if start: cmd = 1 str = "start " else: cmd = 0 str = "stop" payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_START_STOP_DG_CMD, payload=payload) print(str+"DG cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False def cmd_start_stop_trimmer_heater(self, start=True): """ Constructs and sends the start/stop DG trimmer heater command @param start: boolean - True = start heater, False = stop heater. @return: non-zero integer if successful, False otherwise """ if start: cmd = 1 str = "start " else: cmd = 0 str = "stop" payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_START_STOP_DG_TRIMMER_HEATER, payload=payload) print(str+"DG trimmer heater cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: print("Timeout!!!!") return False