########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_proxy.py # # @author (last) Dara Navaei # @date (last) 31-Oct-2021 # @author (original) Sean # @date (original) 15-Apr-2020 # ############################################################################ from logging import Logger from ..common.msg_defs import MsgIds from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem from ..utils.conversions import integer_to_bytearray, float_to_bytearray class DGHDProxy(AbstractSubSystem): """ Dialysate Generator (DG) Dialin API sub-class for HD proxy commands. """ # Reservoir IDs RESERVOIR1 = 0 RESERVOIR2 = 1 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger def cmd_switch_reservoirs(self, reservoir_id: int = RESERVOIR1) -> int: """ Constructs and sends the switch reservoirs command. Constraints: DG must be in re-circulate mode. Given reservoirID must be in the reservoir list below. @param reservoir_id: unsigned int - reservoir to set as active (HD will draw from this reservoir). @return: 1 if successful, zero otherwise @details Reservoir IDs: \n 0 = RESERVOIR 1 \n 1 = RESERVOIR 2 \n """ res = integer_to_bytearray(reservoir_id) payload = res message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_SWITCH_RESERVOIR_CMD.value, payload=payload) self.logger.debug("switch reservoirs cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_fill(self, volume: int = 1500, tgt_fill_flow_lpm: float = 0.8, start: int = 1) -> int: """ Constructs and sends the fill command. Constraints: DG must be in re-circulate water state of re-circulate mode. Given fill to volume must be between 0 and 1950 mL. @param volume: unsigned int - volume (in mL) to fill inactive reservoir to. @param tgt_fill_flow_lpm: float - target fill flow rate in L/min. @param start: unsigned int - 1 = start fill, 0 = stop fill. @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(volume) + integer_to_bytearray(start) + float_to_bytearray(tgt_fill_flow_lpm) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_FILL_CMD.value, payload=payload) self.logger.debug("fill cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_drain(self, volume: int = 0, tare_load_cell: bool = False, rinse_conc_lines: bool = False, start: int = 1) -> int: """ Constructs and sends the drain command. Constraints: DG must be in re-circulate mode. Given drain to volume must be between 0 and 1950 mL. @param volume: unsigned int - volume (in mL) to drain the inactive reservoir to. @param tare_load_cell: bool - flag indicates to tare load cell. @param rinse_conc_lines: bool - flag indicates to whether rinse the concentrate lines or not. @param start: int - start/stop drain command. The default is start = 1 @return: 1 if successful, zero otherwise """ vol = integer_to_bytearray(volume) tare = integer_to_bytearray(tare_load_cell) rinse = integer_to_bytearray(rinse_conc_lines) st = integer_to_bytearray(start) payload = vol + tare + rinse + st message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_DRAIN_CMD.value, payload=payload) self.logger.debug("drain cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_start_stop_dg(self, start: bool = True) -> int: """ Constructs and sends the start/stop DG command Constraints: DG must be in idle state of standby mode if start command given. DG must be in re-circulate mode if stop command given. @param start: boolean - True = start DG, False = stop DG. @return: 1 if successful, zero otherwise """ if start: cmd = 1 cmd_str = "start " else: cmd = 0 cmd_str = "stop " payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_STARTING_STOPPING_TREATMENT_CMD.value, payload=payload) self.logger.debug(cmd_str + "DG cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_start_stop_trimmer_heater(self, start: bool = True) -> int: """ Constructs and sends the start/stop DG trimmer heater command @param start: boolean - True = start heater, False = stop heater. @return: non-zero integer if successful, False otherwise """ if start: cmd = 1 cmd_str = "start " else: cmd = 0 cmd_str = "stop " payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_START_STOP_TRIMMER_HEATER_CMD.value, payload=payload) self.logger.debug(cmd_str + "DG trimmer heater cmd sent to DG") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_stop_primary_heater(self) -> None: """ Constructs and sends stop heat disinfect command @returns none """ # 0 is to stop payload = integer_to_bytearray(0) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_START_STOP_PRIMARY_HEATER.value, payload=payload) self.logger.debug("Stopping primary heater") self.can_interface.send(message, 0) def cmd_sample_water(self, cmd: int) -> None: """ Constructs and sends sample water command @param cmd: int - 0 = stop, 1 = start, 2 = flush, 3 = end. @returns none """ payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_SAMPLE_WATER_CMD.value, payload=payload) self.logger.debug("Sending sample water command") self.can_interface.send(message, 0) def cmd_start_stop_heat_disinfect(self, start: bool = True) -> int: """ Constructs and sends the start/stop DG heat disinfect command @param start: (bool) True = start heat disinfect, False = stop heat disinfect. @return: non-zero integer if successful, False otherwise """ # 1 is to start if start: cmd = 1 cmd_str = "Starting" else: cmd = 0 cmd_str = "Stopping" payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_START_STOP_HEAT_DISINFECT.value, payload=payload) self.logger.debug(cmd_str + " DG heat disinfect") received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_start_stop_dg_flush(self, start: bool = True) -> int: """ Constructs and sends the start/stop DG flush command @param start: (bool) True = start flush, False = stop flush. @return: non-zero integer if successful, False otherwise """ # 1 is to start if start: cmd = 1 cmd_str = "Starting" else: cmd = 0 cmd_str = "Stopping" payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_START_STOP_FLUSH.value, payload=payload) self.logger.debug(cmd_str + " DG flush") received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_start_stop_dg_chemical_disinfect(self, start: bool = True) -> int: """ Constructs and sends the start/stop DG chemical disinfect command @param start: (bool) True = start chemical disinfect, False = stop chemical disinfect. @return: non-zero integer if successful, False otherwise """ # 1 is to start if start: cmd = 1 str_text = "Starting" else: cmd = 0 str_text = "Stopping" payload = integer_to_bytearray(cmd) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_START_STOP_CHEM_DISINFECT.value, payload=payload) self.logger.debug(str_text + " DG chemical disinfect") received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False