########################################################################### # # Copyright (c) 2020-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file valves.py # # @author (last) Dara Navaei # @date (last) 10-Nov-2022 # @author (original) Peman Montazemi # @date (original) 19-May-2020 # ############################################################################ import struct from enum import unique from logging import Logger from collections import OrderedDict from .constants import NO_RESET from ..common.msg_defs import MsgIds from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray # Valve states ENERGIZED = True DEENERGIZED = False @unique class VPiVSPVBfVRD1VRD2States(DialinEnum): VALVE_STATE_CLOSED = 0 VALVE_STATE_OPEN = 1 @unique class VPdStates(DialinEnum): VALVE_STATE_DRAIN_C_TO_NO = 0 VALVE_STATE_OPEN_C_TO_NC = 1 @unique class VPoStates(DialinEnum): VALVE_STATE_NOFILL_C_TO_NO = 0 VALVE_STATE_FILL_C_TO_NC = 1 @unique class VDrVRcStates(DialinEnum): VALVE_STATE_DRAIN_C_TO_NO = 0 VALVE_STATE_RECIRC_C_TO_NC = 1 @unique class VRoVRiStates(DialinEnum): VALVE_STATE_R1_C_TO_NO = 0 VALVE_STATE_R2_C_TO_NC = 1 @unique class VRdVRfStates(DialinEnum): VALVE_STATE_R2_C_TO_NO = 0 VALVE_STATE_R1_C_TO_NC = 1 @unique class DGValvesSensedStates(DialinEnum): VALVE_STATE_OPEN = 0 # Open valve state, used only for VPi, VBf, VRD1, VRD2, and VSP VALVE_STATE_CLOSED = 1 # Closed valve state, used only for VPi, VBf, VRD1, VRD2, and VSP VALVE_STATE_OPEN_C_TO_NC = 2 # Open Common to Normally Closed valve state, used only for VPd VALVE_STATE_NOFILL_C_TO_NO = 3 # No Fill Common to Normally Open valve state, used only for VPo VALVE_STATE_FILL_C_TO_NC = 4 # Fill Common to Normally Closed valve state, used only for VPo VALVE_STATE_DRAIN_C_TO_NO = 5 # Drain Common to Normally Open valve state, used only for VDr and VRc. It is also used for VPd in V3 VALVE_STATE_RECIRC_C_TO_NC = 6 # Recirculate Common to Normally Closed valve state, used only for VDr and VRc VALVE_STATE_R1_C_TO_NO = 7 # Reservoir 1 Common to Normally Open valve state, used only for VRo and VRi VALVE_STATE_R1_C_TO_NC = 8 # Reservoir 1 Common to Normally Closed valve state, used only for VRf VALVE_STATE_R2_C_TO_NO = 9 # Reservoir 2 Common to Normally Open valve state, used only for VRf VALVE_STATE_R2_C_TO_NC = 10 # Reservoir 2 Common to Normally Closed valve state, used only for VRo and VRi NUM_OF_VALVE_STATES = 11 # number of valve states @unique class DGValveNames(DialinEnum): # NOTE: NUM_OF enum has been removed because it should be a part of the software configuration # structure since the members of this class is for looped to create the dictionary automatically VALVE_RESERVOIR_FILL = 0 # VRF VALVE_RESERVOIR_INLET = 1 # VRI RESERVED_SPACE = 2 # RESERVED SPACE VALVE_RESERVOIR_OUTLET = 3 # VRO VALVE_PRESSURE_OUTLET = 4 # VPO VALVE_BYPASS_FILTER = 5 # VBF VALVE_RECIRCULATE = 6 # VRC VALVE_DRAIN = 7 # VDR VALVE_PRESSURE_INLET = 8 # VPI VALVE_SAMPLING_PORT = 9 # VSP VALVE_RESERVOIR_DRAIN_1 = 10 # VRD1 VALVE_RESERVOIR_DRAIN_2 = 11 # VRD2 VALVE_PRODUCTION_DRAIN = 12 # VPD class DGValves(AbstractSubSystem): """ Dialysate Generation (DG) interface for valve related commands. """ # Valves states publish message field positions START_POS_VALVES_STATES = DenaliMessage.PAYLOAD_START_INDEX END_POS_VALVES_STATES = START_POS_VALVES_STATES + 2 # Valves States come in as a U16 value (2 bytes) def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self.valves_sensed_states = OrderedDict() if self.can_interface is not None: channel_id = DenaliChannels.dg_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DG_VALVES_STATES.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_valves_sync) self.valve_states_all = 0x0000 self.valve_state_VRF = {"id": DGValveNames.VALVE_RESERVOIR_FILL.value, "state": DEENERGIZED} self.valve_state_VRI = {"id": DGValveNames.VALVE_RESERVOIR_INLET.value, "state": DEENERGIZED} self.valve_state_VRO = {"id": DGValveNames.VALVE_RESERVOIR_OUTLET.value, "state": DEENERGIZED} self.valve_state_VPO = {"id": DGValveNames.VALVE_PRESSURE_OUTLET.value, "state": DEENERGIZED} self.valve_state_VBF = {"id": DGValveNames.VALVE_BYPASS_FILTER.value, "state": DEENERGIZED} self.valve_state_VRC = {"id": DGValveNames.VALVE_RECIRCULATE.value, "state": DEENERGIZED} self.valve_state_VDR = {"id": DGValveNames.VALVE_DRAIN.value, "state": DEENERGIZED} self.valve_state_VPI = {"id": DGValveNames.VALVE_PRESSURE_INLET.value, "state": DEENERGIZED} self.valve_state_VSP = {"id": DGValveNames.VALVE_SAMPLING_PORT.value, "state": DEENERGIZED} self.valve_state_VRD1 = {"id": DGValveNames.VALVE_RESERVOIR_DRAIN_1.value, "state": DEENERGIZED} self.valve_state_VRD2 = {"id": DGValveNames.VALVE_RESERVOIR_DRAIN_2.value, "state": DEENERGIZED} self.valve_state_VPD = {"id": DGValveNames.VALVE_PRODUCTION_DRAIN.value, "state": DEENERGIZED} # NOTE: The len function counts the enums with the same number only once. This is not the case in the DG valves # class because each valve must have a unique ID. self.valve_states_enum = [0 for _ in range(len(DGValveNames))] for valve in DGValveNames.__members__: self.valves_sensed_states[valve] = '' def get_valve_states(self): """ Gets the valve states @return: All valve states: \n [\n Valve Reservoir Fill \n Valve Reservoir Inlet \n Valve Reservoir Drain \n Valve Reservoir Outlet \n Valve Pressure Outlet \n Valve Bypass Filter \n Valve Recirculate \n Valve Drain \n Valve Pressure Inlet \n Valve Sampling Port \n Valve Reservoir 1 Drain \n Valve Reservoir 2 Drain \n Valve Production Drain \n ]\n """ return [ self.valve_state_VRF.get("state", None), self.valve_state_VRI.get("state", None), self.valve_state_VRO.get("state", None), self.valve_state_VPO.get("state", None), self.valve_state_VBF.get("state", None), self.valve_state_VRC.get("state", None), self.valve_state_VDR.get("state", None), self.valve_state_VPI.get("state", None), self.valve_state_VSP.get("state", None), self.valve_state_VRD1.get("state", None), self.valve_state_VRD2.get("state", None), self.valve_state_VPD.get("state", None) ] @staticmethod def sort_by_id(observation): """ Converts a published dictionary of valve state information to an ordered list of tuples. For example: hd = DG() observation = {'datetime': datetime.datetime(2020, 7, 13, 10, 43, 27, 433357), 'valve_state_VBF': {'id': 5, 'state': True}, 'valve_state_VDR': {'id': 7, 'state': True}, 'valve_state_VPD': {'id': 12, 'state': True}, 'valve_state_VPI': {'id': 8, 'state': True}, 'valve_state_VPO': {'id': 4, 'state': True}, 'valve_state_VR1': {'id': 10, 'state': True}, 'valve_state_VR2': {'id': 11, 'state': True}, 'valve_state_VRC': {'id': 6, 'state': True}, 'valve_state_VRF': {'id': 0, 'state': True}, 'valve_state_VRI': {'id': 1, 'state': True}, 'valve_state_VRO': {'id': 3, 'state': True}, 'valve_state_VSP': {'id': 9, 'state': True}, 'valve_states_all': 8191} self.logger.debug(hd.valves.sort_by_id(observation)) ('valve_state_VRF', 0, True) ('valve_state_VRI', 1, True) ('valve_state_VRO', 3, True) ('valve_state_VPO', 4, True) ('valve_state_VBF', 5, True) ('valve_state_VRC', 6, True) ('valve_state_VDR', 7, True) ('valve_state_VPI', 8, True) ('valve_state_VSP', 9, True) ('valve_state_VR1', 10, True) ('valve_state_VR2', 11, True) ('valve_state_VPD', 12, True) @param observation: dictionary of the observed valve states @return: a list of tuples of the valve states """ result = [] for key, value in observation.items(): if isinstance(value, dict): result.append((key, value.get("id", None), value.get("state", None))) result = sorted(result, key=lambda each: each[1]) return result @staticmethod def _binary_to_valve_state(binary) -> bool: """ @param binary: binary value @return: 1 = energized, otherwise de-energized """ if binary != 0: return ENERGIZED else: return DEENERGIZED @publish([ "valve_states_all", "valve_state_VRF", "valve_state_VRI", "valve_state_VRO", "valve_state_VPO", "valve_state_VBF", "valve_state_VRC", "valve_state_VDR", "valve_state_VPI", "valve_state_VSP", "valve_state_VRD1", "valve_state_VRD2", "valve_state_VPD", "valve_states_enum" ]) def _handler_valves_sync(self, message): """ Handles published valves states message. @param message: published valves states message @return: none """ vst = struct.unpack('H', bytearray(message['message'][self.START_POS_VALVES_STATES:self.END_POS_VALVES_STATES])) self.valve_states_all = vst[0] # Extract each valve state from U16 valves states using bit-masking self.valve_state_VRF["state"] = self._binary_to_valve_state(vst[0] & 1) self.valve_state_VRI["state"] = self._binary_to_valve_state(vst[0] & 2) self.valve_state_VRO["state"] = self._binary_to_valve_state(vst[0] & 8) self.valve_state_VPO["state"] = self._binary_to_valve_state(vst[0] & 16) self.valve_state_VBF["state"] = self._binary_to_valve_state(vst[0] & 32) self.valve_state_VRC["state"] = self._binary_to_valve_state(vst[0] & 64) self.valve_state_VDR["state"] = self._binary_to_valve_state(vst[0] & 128) self.valve_state_VPI["state"] = self._binary_to_valve_state(vst[0] & 256) self.valve_state_VSP["state"] = self._binary_to_valve_state(vst[0] & 512) self.valve_state_VRD1["state"] = self._binary_to_valve_state(vst[0] & 1024) self.valve_state_VRD2["state"] = self._binary_to_valve_state(vst[0] & 2048) self.valve_state_VPD["state"] = self._binary_to_valve_state(vst[0] & 4096) self.valve_states_enum[DGValveNames.VALVE_RESERVOIR_FILL.value] = VRdVRfStates(self._binary_to_valve_state(vst[0] & 1)).name # VRF self.valve_states_enum[DGValveNames.VALVE_RESERVOIR_INLET.value] = VRoVRiStates(self._binary_to_valve_state(vst[0] & 2)).name # VRI self.valve_states_enum[DGValveNames.VALVE_RESERVOIR_OUTLET.value] = VRoVRiStates(self._binary_to_valve_state(vst[0] & 8)).name # VRO self.valve_states_enum[DGValveNames.VALVE_PRESSURE_OUTLET.value] = VPoStates(self._binary_to_valve_state(vst[0] & 16)).name # VPO self.valve_states_enum[DGValveNames.VALVE_BYPASS_FILTER.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 32)).name # VBF self.valve_states_enum[DGValveNames.VALVE_RECIRCULATE.value] = VDrVRcStates(self._binary_to_valve_state(vst[0] & 64)).name # VRC self.valve_states_enum[DGValveNames.VALVE_DRAIN.value] = VDrVRcStates(self._binary_to_valve_state(vst[0] & 128)).name # VDR self.valve_states_enum[DGValveNames.VALVE_PRESSURE_INLET.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 256)).name # VPI self.valve_states_enum[DGValveNames.VALVE_SAMPLING_PORT.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 512)).name # VSP self.valve_states_enum[DGValveNames.VALVE_RESERVOIR_DRAIN_1.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 1024)).name # VRD1 self.valve_states_enum[DGValveNames.VALVE_RESERVOIR_DRAIN_2.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 2048)).name # VRD2 self.valve_states_enum[DGValveNames.VALVE_PRODUCTION_DRAIN.value] = VPdStates(self._binary_to_valve_state(vst[0] & 4096)).name # VPD start = self.END_POS_VALVES_STATES end = start + 1 for valve_id in self.valves_sensed_states: valve_state_number = struct.unpack('B', bytearray(message['message'][start:end]))[0] self.valves_sensed_states[valve_id] = DGValvesSensedStates(valve_state_number).name start = end end += 1 def cmd_valve_sensed_state_override(self, valve: int, state: bool, reset: int = NO_RESET) -> int: """ Constructs and sends the valve sensed state override command. Constraints: Must be logged into DG. Given valve ID must be one of the valve IDs listed below. @param valve: unsigned int - valve ID @param state: bool - valve state @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise valve IDs: \n 0 = Valve Reservoir Fill \n 1 = Valve Reservoir Inlet \n 3 = Valve Reservoir Outlet \n 4 = Valve Pressure Outlet \n 5 = Valve Bypass Filter \n 6 = Valve Recirculate \n 7 = Valve Drain \n 8 = Valve Pressure Inlet \n 9 = Valve Sampling Port \n 10 = Valve Reservoir 1 Drain \n 11 = Valve Reservoir 2 Drain \n 12 = Valve Production Drain \n """ rst = integer_to_bytearray(reset) ste = integer_to_bytearray(int(state)) vlv = integer_to_bytearray(valve) payload = rst + ste + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_DG_VALVES_SENSED_STATE_OVERRIDE.value, payload=payload) self.logger.debug("Override valve sensed state") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_valve_override(self, valve: int, state: bool, reset: int = NO_RESET) -> int: """ Constructs and sends the valve state override command. Constraints: Must be logged into DG. Given valve ID must be one of the valve IDs listed below. @param valve: unsigned int - valve ID @param state: bool - valve state @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise valve IDs: \n 0 = Valve Reservoir Fill \n 1 = Valve Reservoir Inlet \n 3 = Valve Reservoir Outlet \n 4 = Valve Pressure Outlet \n 5 = Valve Bypass Filter \n 6 = Valve Recirculate \n 7 = Valve Drain \n 8 = Valve Pressure Inlet \n 9 = Valve Sampling Port \n 10 = Valve Reservoir 1 Drain \n 11 = Valve Reservoir 2 Drain \n 12 = Valve Production Drain \n """ rst = integer_to_bytearray(reset) ste = integer_to_bytearray(int(state)) vlv = integer_to_bytearray(valve) payload = rst + ste + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_VALVE_STATE_OVERRIDE.value, payload=payload) self.logger.debug("Override valve state") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_valve_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the valve state override command. Constraints: Must be logged into DG. Given interval must be non-zero and a multiple of the DG general task interval (50 ms). @param ms: unsigned int - broadcast interval (in ms) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) ivl = integer_to_bytearray(ms) payload = rst + ivl message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dg_ch_id, message_id=MsgIds.MSG_ID_VALVES_STATES_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override valves states publish interval") # Send message received_message = self.can_interface.send(message) # If there is content in message if received_message is not None: # Response payload is OK or not return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False