########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_proxy.py # # @author (last) Peter Lucia # @date (last) 29-Jun-2020 # @author (original) Sean # @date (original) 15-Apr-2020 # ############################################################################ from ..utils.conversions import integer_to_bytearray from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish from logging import Logger class DGHDProxy(_AbstractSubSystem): """ \class DGHDProxy \brief Dialysate Generator (DG) Dialin API sub-class for HD proxy commands. """ # Pressure/Occlusion message IDs MSG_ID_HD_SWITCH_RESERVOIRS_CMD = 0x0021 MSG_ID_HD_FILL_CMD = 0x0022 MSG_ID_HD_DRAIN_CMD = 0x0023 MSG_ID_HD_START_STOP_DG_CMD = 0x0026 MSG_ID_HD_START_STOP_DG_TRIMMER_HEATER = 0x002B # Reservoir IDs RESERVOIR1 = 0 RESERVOIR2 = 1 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger def cmd_switch_reservoirs(self, reservoirID=RESERVOIR1): """ Constructs and sends the switch reservoirs command. Constraints: DG must be in re-circulate mode. Given reservoirID must be in the reservoir list below. @param reservoirID: unsigned int - reservoir to set as active (HD will draw from this reservoir). @return: 1 if successful, zero otherwise \details Reservoir IDs: \n 0 = RESERVOIR 1 \n 1 = RESERVOIR 2 \n """ res = integer_to_bytearray(reservoirID) payload = res message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_SWITCH_RESERVOIRS_CMD, payload=payload) self.logger.debug("switch reservoirs cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_fill(self, volume=1500): """ Constructs and sends the fill command. Constraints: DG must be in re-circulate water state of re-circulate mode. Given fill to volume must be between 0 and 1950 mL. @param volume: unsigned int - volume (in mL) to fill inactive reservoir to. @return: 1 if successful, zero otherwise """ vol = integer_to_bytearray(volume) payload = vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_FILL_CMD, payload=payload) self.logger.debug("fill cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_drain(self, volume=200): """ Constructs and sends the drain command. Constraints: DG must be in re-circulate mode. Given drain to volume must be between 0 and 1950 mL. @param volume: unsigned int - volume (in mL) to drain the inactive reservoir to. @return: 1 if successful, zero otherwise """ vol = integer_to_bytearray(volume) payload = vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_DRAIN_CMD, payload=payload) self.logger.debug("drain cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_start_stop_dg(self, start=True): """ Constructs and sends the start/stop DG command Constraints: DG must be in idle state of standby mode if start command given. DG must be in re-circulate mode if stop command given. @param start: boolean - True = start DG, False = stop DG. @return: 1 if successful, zero otherwise """ if start: cmd = 1 str = "start " else: cmd = 0 str = "stop" payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_START_STOP_DG_CMD, payload=payload) self.logger.debug(str+"DG cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_start_stop_trimmer_heater(self, start=True): """ Constructs and sends the start/stop DG trimmer heater command @param start: boolean - True = start heater, False = stop heater. @return: non-zero integer if successful, False otherwise """ if start: cmd = 1 str = "start " else: cmd = 0 str = "stop" payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_HD_START_STOP_DG_TRIMMER_HEATER, payload=payload) self.logger.debug(str+"DG trimmer heater cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False