########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file valves.py # # @author (last) Peter Lucia # @date (last) 20-Jul-2020 # @author (original) Peman Montazemi # @date (original) 19-May-2020 # ############################################################################ import struct from ..utils.conversions import integer_to_bytearray from .constants import NO_RESET from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish from logging import Logger # Valve states ENERGIZED = True DEENERGIZED = False class DGValves(_AbstractSubSystem): """ Dialysate Generation (DG) interface for valve related commands. """ # Valve state message IDs MSG_ID_DG_VALVES_STATES = 0x2A MSG_ID_DG_VALVE_STATE_OVERRIDE = 0xA00E MSG_ID_DG_VALVES_STATES_PUBLISH_INTERVAL_OVERRIDE = 0xA00F # Valves states publish message field positions START_POS_VALVES_STATES = DenaliMessage.PAYLOAD_START_INDEX END_POS_VALVES_STATES = START_POS_VALVES_STATES + 2 # Valves States come in as a U16 value (2 bytes) # Valve IDs VALVE_RESERVOIR_FILL = 0 # VRF VALVE_RESERVOIR_INLET = 1 # VRI VALVE_RESERVOIR_DRAIN = 2 # VRD VALVE_RESERVOIR_OUTLET = 3 # VRO VALVE_PRESSURE_OUTLET = 4 # VPO VALVE_BYPASS_FILTER = 5 # VBF VALVE_RECIRCULATE = 6 # VRC VALVE_DRAIN = 7 # VDR VALVE_PRESSURE_INLET = 8 # VPI VALVE_SAMPLING_PORT = 9 # VSP VALVE_RESERVOIR_1 = 10 # VR1 VALVE_RESERVOIR_2 = 11 # VR2 VALVE_PRODUCTION_DRAIN = 12 # VPD def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = self.MSG_ID_DG_VALVES_STATES self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_valves_sync) self.valve_states_all = 0x0000 self.valve_state_VRF = {"id": self.VALVE_RESERVOIR_FILL, "state": DEENERGIZED} self.valve_state_VRI = {"id": self.VALVE_RESERVOIR_INLET, "state": DEENERGIZED} self.valve_state_VRD = {"id": self.VALVE_RESERVOIR_DRAIN, "state": DEENERGIZED} self.valve_state_VRO = {"id": self.VALVE_RESERVOIR_OUTLET, "state": DEENERGIZED} self.valve_state_VPO = {"id": self.VALVE_PRESSURE_OUTLET, "state": DEENERGIZED} self.valve_state_VBF = {"id": self.VALVE_BYPASS_FILTER, "state": DEENERGIZED} self.valve_state_VRC = {"id": self.VALVE_RECIRCULATE, "state": DEENERGIZED} self.valve_state_VDR = {"id": self.VALVE_DRAIN, "state": DEENERGIZED} self.valve_state_VPI = {"id": self.VALVE_PRESSURE_INLET, "state": DEENERGIZED} self.valve_state_VSP = {"id": self.VALVE_SAMPLING_PORT, "state": DEENERGIZED} self.valve_state_VR1 = {"id": self.VALVE_RESERVOIR_1, "state": DEENERGIZED} self.valve_state_VR2 = {"id": self.VALVE_RESERVOIR_2, "state": DEENERGIZED} self.valve_state_VPD = {"id": self.VALVE_PRODUCTION_DRAIN, "state": DEENERGIZED} def get_valve_states(self): """ Gets the valve states @return: All valve states: \n [\n Valve Reservoir Fill \n Valve Reservoir Inlet \n Valve Reservoir Drain \n Valve Reservoir Outlet \n Valve Pressure Outlet \n Valve Bypass Filter \n Valve Recirculate \n Valve Drain \n Valve Pressure Inlet \n Valve Sampling Port \n Valve Reservoir 1 (spare for now including DG FPGA, as valve is of passive air relief type) \n Valve Reservoir 2 (spare for now including DG FPGA, as valve is of passive air relief type) \n Valve Production Drain \n ]\n """ return [ self.valve_state_VRF.get("state", None), self.valve_state_VRI.get("state", None), self.valve_state_VRD.get("state", None), self.valve_state_VRO.get("state", None), self.valve_state_VPO.get("state", None), self.valve_state_VBF.get("state", None), self.valve_state_VRC.get("state", None), self.valve_state_VDR.get("state", None), self.valve_state_VPI.get("state", None), self.valve_state_VSP.get("state", None), self.valve_state_VR1.get("state", None), self.valve_state_VR2.get("state", None), self.valve_state_VPD.get("state", None) ] @staticmethod def sort_by_id(observation): """ Converts a published dictionary of valve state information to an ordered list of tuples. For example: >>> dg = DG() >>> observation = {'datetime': datetime.datetime(2020, 7, 13, 10, 43, 27, 433357), 'valve_state_VBF': {'id': 5, 'state': True}, 'valve_state_VDR': {'id': 7, 'state': True}, 'valve_state_VPD': {'id': 12, 'state': True}, 'valve_state_VPI': {'id': 8, 'state': True}, 'valve_state_VPO': {'id': 4, 'state': True}, 'valve_state_VR1': {'id': 10, 'state': True}, 'valve_state_VR2': {'id': 11, 'state': True}, 'valve_state_VRC': {'id': 6, 'state': True}, 'valve_state_VRD': {'id': 2, 'state': True}, 'valve_state_VRF': {'id': 0, 'state': True}, 'valve_state_VRI': {'id': 1, 'state': True}, 'valve_state_VRO': {'id': 3, 'state': True}, 'valve_state_VSP': {'id': 9, 'state': True}, 'valve_states_all': 8191} >>> self.logger.debug(dg.valves.sort_by_id(observation)) ('valve_state_VRF', 0, True) ('valve_state_VRI', 1, True) ('valve_state_VRD', 2, True) ('valve_state_VRO', 3, True) ('valve_state_VPO', 4, True) ('valve_state_VBF', 5, True) ('valve_state_VRC', 6, True) ('valve_state_VDR', 7, True) ('valve_state_VPI', 8, True) ('valve_state_VSP', 9, True) ('valve_state_VR1', 10, True) ('valve_state_VR2', 11, True) ('valve_state_VPD', 12, True) @param observation: dictionary of the observed valve states @return: a list of tuples of the valve states """ result = [] for key, value in observation.items(): if isinstance(value, dict): result.append((key, value.get("id", None), value.get("state", None))) result = sorted(result, key=lambda each: each[1]) return result def _binary_to_valve_state(self, binary): """ @param binary: binary value @return: 1 = energized, otherwise de-energized """ if binary != 0: return ENERGIZED else: return DEENERGIZED @_publish([ "valve_states_all", "valve_state_VRF", "valve_state_VRI", "valve_state_VRD", "valve_state_VRO", "valve_state_VPO", "valve_state_VBF", "valve_state_VRC", "valve_state_VDR", "valve_state_VPI", "valve_state_VSP", "valve_state_VR1", "valve_state_VR2", "valve_state_VPD" ]) def _handler_valves_sync(self, message): """ Handles published valves states message. @param message: published valves states message @return: none """ vst = struct.unpack('H', bytearray(message['message'][self.START_POS_VALVES_STATES:self.END_POS_VALVES_STATES])) self.valve_states_all = vst[0] # Extract each valve state from U16 valves states using bit-masking self.valve_state_VRF["state"] = self._binary_to_valve_state(vst[0] & 1) self.valve_state_VRI["state"] = self._binary_to_valve_state(vst[0] & 2) self.valve_state_VRD["state"] = self._binary_to_valve_state(vst[0] & 4) self.valve_state_VRO["state"] = self._binary_to_valve_state(vst[0] & 8) self.valve_state_VPO["state"] = self._binary_to_valve_state(vst[0] & 16) self.valve_state_VBF["state"] = self._binary_to_valve_state(vst[0] & 32) self.valve_state_VRC["state"] = self._binary_to_valve_state(vst[0] & 64) self.valve_state_VDR["state"] = self._binary_to_valve_state(vst[0] & 128) self.valve_state_VPI["state"] = self._binary_to_valve_state(vst[0] & 256) self.valve_state_VSP["state"] = self._binary_to_valve_state(vst[0] & 512) self.valve_state_VR1["state"] = self._binary_to_valve_state(vst[0] & 1024) self.valve_state_VR2["state"] = self._binary_to_valve_state(vst[0] & 2048) self.valve_state_VPD["state"] = self._binary_to_valve_state(vst[0] & 4096) def cmd_valve_override(self, state, valve, reset=NO_RESET): """ Constructs and sends the valve state override command. Constraints: Must be logged into DG. Given valve ID must be one of the valve IDs listed below. @param state: bool - valve state @param valve: unsigned int - valve ID @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise valve IDs: \n 0 = Valve Reservoir Fill \n 1 = Valve Reservoir Inlet \n 2 = Valve Reservoir Drain \n 3 = Valve Reservoir Outlet \n 4 = Valve Pressure Outlet \n 5 = Valve Bypass Filter \n 6 = Valve Recirculate \n 7 = Valve Drain \n 8 = Valve Pressure Inlet \n 9 = Valve Sampling Port \n 10 = Valve Reservoir 1 (spare for now including DG FPGA, as valve is of passive air relief type) \n 11 = Valve Reservoir 2 (spare for now including DG FPGA, as valve is of passive air relief type) \n 12 = Valve Production Drain \n """ rst = integer_to_bytearray(reset) ste = integer_to_bytearray(state) vlv = integer_to_bytearray(valve) payload = rst + ste + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_DG_VALVE_STATE_OVERRIDE, payload=payload) self.logger.debug("override valve state") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_valve_broadcast_interval_override(self, ms, reset=NO_RESET): """ Constructs and sends the valve state override command. Constraints: Must be logged into DG. Given interval must be non-zero and a multiple of the DG priority task interval (10 ms). @param ms: unsigned int - broadcast interval (in ms) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) ivl = integer_to_bytearray(ms) payload = rst + ivl message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=self.MSG_ID_DG_VALVES_STATES_PUBLISH_INTERVAL_OVERRIDE, payload=payload) self.logger.debug("override valves states publish interval") # Send message received_message = self.can_interface.send(message) # If there is content in message if received_message is not None: # Response payload is OK or not return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False