########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file samplewater.py # # @author (last) Quang Nguyen # @date (last) 05-Aug-2021 # @author (original) Quang Nguyen # @date (original) 02-Mar-2021 # ############################################################################ import struct from logging import Logger from .constants import NO_RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.conversions import integer_to_bytearray class DGSampleWater(AbstractSubSystem): """ Dialysate Generator (DG) Dialin API sub-class for sample water related commands. """ def __init__(self, can_interface, logger: Logger): """ DGSampleWater constructor """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(DenaliChannels.dg_sync_broadcast_ch_id, MsgIds.MSG_ID_DG_FILTER_FLUSH_PROGRESS.value, self._handler_filter_flush_progress_sync) self.filter_flush_timeout = 0 self.filter_flush_time_countdown = 0 def get_filter_flush_timeout(self): """ Gets the filter flush timeout @return: The filter flush timeout """ return self.filter_flush_timeout def get_filter_flush_time_countdown(self): """ Gets the filter flush time countdown @return: The filter flush time countdown """ return self.filter_flush_time_countdown @publish([ "filter_flush_timeout", "filter_flush_time_countdown" ]) def _handler_filter_flush_progress_sync(self, message: dict) -> None: """ Handles published filter flush progress data messages. Filter flush progress data are captured for reference. @param message: published filter flush progress data message @return: None """ self.filter_flush_timeout = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.filter_flush_time_countdown = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] def cmd_filter_flush_time_period_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the filter flush time periodoverride command @param ms: integer - time period (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(reset) + integer_to_bytearray(ms) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_FILTER_FLUSH_TIME_PERIOD_OVERRIDE.value, payload=payload) self.logger.debug("override DG filter flush time period") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.error("Timeout!!!!") return False