########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file reservoirs.py # # @author (last) Sean Nash # @date (last) 15-Oct-2020 # @author (original) Sean # @date (original) 14-Apr-2020 # ############################################################################ import struct from logging import Logger from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish from ..common.msg_defs import MsgIds, MsgFieldPositions from ..utils.conversions import integer_to_bytearray class DGReservoirs(_AbstractSubSystem): """ DG interface containing reservoir related commands. """ # Reservoir IDs RESERVOIR1 = 0 RESERVOIR2 = 1 def __init__(self, can_interface, logger: Logger): """ @param can_interface: The DenaliCANMessenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DG_RESERVOIR_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_reservoirs_sync) self.active_reservoir = self.RESERVOIR1 self.fill_to_vol_ml = 0 self.drain_to_vol_ml = 0 def get_active_reservoir(self): """ Gets the active reservoir RESERVOIR1 = 0 \n RESERVOIR2 = 1 \n @return: 0 or 1 """ return self.active_reservoir def get_fill_to_vol(self): """ Gets the fill to volume @return: The fill to volume (mL) """ return self.fill_to_vol_ml def get_drain_to_vol(self): """ Gets the drain to volume @return: The drain to volume (mL) """ return self.drain_to_vol_ml def cmd_switch_reservoirs(self, reservoir: int) -> bool: """ Sends a command to the DG to switch reservoirs @param reservoir: (int) the new reservoir number @return: True if command sent, False if invalid reservoir provided """ if reservoir not in [0, 1]: return False payload = integer_to_bytearray(reservoir) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_SWITCH_RESERVOIR_CMD.value, payload=payload) self.logger.debug("Sending command to switch DG reservoir to {0}".format(reservoir)) self.can_interface.send(message, 0) return True @_publish([ "active_reservoir", "fill_to_vol_ml", "drain_to_vol_ml" ]) def _handler_reservoirs_sync(self, message): """ Handles published reservoir data messages. Reservoir data are captured for reference. @param message: published reservoir data message @return: none """ res = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) fil = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) dra = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) self.active_reservoir = res[0] self.fill_to_vol_ml = fil[0] self.drain_to_vol_ml = dra[0]