########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file valves.py # # @author (last) Micahel Garthwaite # @date (last) 17-Aug-2023 # @author (original) Peman Montazemi # @date (original) 19-May-2020 # ############################################################################ import struct from enum import unique from logging import Logger from collections import OrderedDict from .constants import NO_RESET from leahi_dialin.common.msg_defs import MsgIds from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish, DialinEnum from leahi_dialin.utils.checks import check_broadcast_interval_override_ms from leahi_dialin.utils.conversions import integer_to_bytearray # Valve states ENERGIZED = True DEENERGIZED = False @unique class DDValveStates(DialinEnum): VALVE_STATE_CLOSED = 0 VALVE_STATE_OPEN = 1 @unique class DDValveNames(DialinEnum): VDR = 0 # Valve Drain (D53) VTD = 1 # Valve Thermal Disinfect (D52) VHB = 2 # Valve Hydraulics Bypass (D8) VRP = 3 # Valve Rinse Port (D54) VHO = 4 # Valve Hydraulics Outlet (D14) VDB1 = 5 # Valve DryBcarb Inlet (D65) VP1 = 6 # Valve Purge 1 (D64) VPT = 7 # Valve Pressure Test (D31) VDB2 = 8 # Valve Dialyzer Bypass (D34) VDI = 9 # Valve Dialyzer Inlet (D35) VDO = 10 # Valve Dialyzer Outlet (D40) VP2 = 11 # Valve Dialysate Out Purge 2 (D47) VHI = 12 # Valve Hydraulics Inlet (D3) VWI = 13 # Valve Water Inlet (M4) RSRVD_SPACE1 = 14 # This space has been reserved RSRVD_SPACE2 = 15 # This space has been reserved BCV1 = 16 # Balancing chamber Valve 1 (D23) BCV2 = 17 # Balancing chamber Valve 2 (D19) BCV3 = 18 # Balancing chamber Valve 3 (D25) BCV4 = 19 # Balancing chamber Valve 4 (D21) BCV5 = 20 # Balancing chamber Valve 5 (D24) BCV6 = 21 # Balancing chamber Valve 6 (D20) BCV7 = 22 # Balancing chamber Valve 7 (D26) BCV8 = 23 # Balancing chamber Valve 8 (D22) UFI1 = 24 # Ultrafiltration Valve 1 Inlet (D69) UFI2 = 25 # Ultrafiltration Valve 2 Inlet (D71) UFO1 = 26 # Ultrafiltration Valve 1 Outlet (D70) UFO2 = 27 # Ultrafiltration Valve 2 Outlet (D72) class DDValves(AbstractSubSystem): """ Dialysate Delivery (DG) interface for valve related commands. """ # Valves states publish message field positions START_POS_VALVES_STATES = DenaliMessage.PAYLOAD_START_INDEX END_POS_VALVES_STATES = START_POS_VALVES_STATES + 2 # Valves States come in as a U16 value (2 bytes) def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self.valves_sensed_states = OrderedDict() self.dg_valves_states_timestamp = 0.0 if self.can_interface is not None: channel_id = DenaliChannels.dd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DD_VALVES_STATES_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_valves_sync) self.valve_states_all = 0x0000 # NOTE: The len function counts the enums with the same number only once. This is not the case in the DG valves # class because each valve must have a unique ID. self.valve_states_enum = [0 for _ in range(len(DDValveNames))] for valve in DDValveNames.__members__: self.valves_sensed_states[valve] = '' def get_valve_states(self): """ Gets the valve states @return: All valve states: \n [\n Valve Reservoir Fill \n Valve Reservoir Inlet \n Reserved Space \n Valve Reservoir Outlet \n Valve Pressure Outlet \n Valve Bypass Filter \n Valve Recirculate \n Valve Drain \n Valve Pressure Inlet \n Valve Sampling Port \n Valve Reservoir 1 Drain \n Valve Reservoir 2 Drain \n Valve Production Drain \n ]\n """ return [ self.valve_state_VRF.get("state", None), self.valve_state_VRI.get("state", None), self.valve_state_VRO.get("state", None), self.valve_state_VPO.get("state", None), self.valve_state_VBF.get("state", None), self.valve_state_VRC.get("state", None), self.valve_state_VDR.get("state", None), self.valve_state_VPI.get("state", None), self.valve_state_VSP.get("state", None), self.valve_state_VRD1.get("state", None), self.valve_state_VRD2.get("state", None), self.valve_state_VPD.get("state", None) ] @staticmethod def sort_by_id(observation): """ Converts a published dictionary of valve state information to an ordered list of tuples. For example: hd = DG() observation = {'datetime': datetime.datetime(2020, 7, 13, 10, 43, 27, 433357), 'valve_state_VBF': {'id': 5, 'state': True}, 'valve_state_VDR': {'id': 7, 'state': True}, 'valve_state_VPD': {'id': 12, 'state': True}, 'valve_state_VPI': {'id': 8, 'state': True}, 'valve_state_VPO': {'id': 4, 'state': True}, 'valve_state_VR1': {'id': 10, 'state': True}, 'valve_state_VR2': {'id': 11, 'state': True}, 'valve_state_VRC': {'id': 6, 'state': True}, 'valve_state_VRF': {'id': 0, 'state': True}, 'valve_state_VRI': {'id': 1, 'state': True}, 'valve_state_VRO': {'id': 3, 'state': True}, 'valve_state_VSP': {'id': 9, 'state': True}, 'valve_states_all': 8191} self.logger.debug(hd.valves.sort_by_id(observation)) ('valve_state_VRF', 0, True) ('valve_state_VRI', 1, True) ('valve_state_VRO', 3, True) ('valve_state_VPO', 4, True) ('valve_state_VBF', 5, True) ('valve_state_VRC', 6, True) ('valve_state_VDR', 7, True) ('valve_state_VPI', 8, True) ('valve_state_VSP', 9, True) ('valve_state_VR1', 10, True) ('valve_state_VR2', 11, True) ('valve_state_VPD', 12, True) @param observation: dictionary of the observed valve states @return: a list of tuples of the valve states """ result = [] for key, value in observation.items(): if isinstance(value, dict): result.append((key, value.get("id", None), value.get("state", None))) result = sorted(result, key=lambda each: each[1]) return result @staticmethod def _binary_to_valve_state(binary) -> bool: """ @param binary: binary value @return: 1 = energized, otherwise de-energized """ if binary != 0: return ENERGIZED else: return DEENERGIZED @publish([ "dg_valves_states_timestamp", "valve_states_all", "valve_state_VRF", "valve_state_VRI", "valve_state_VRO", "valve_state_VPO", "valve_state_VBF", "valve_state_VRC", "valve_state_VDR", "valve_state_VPI", "valve_state_VSP", "valve_state_VRD1", "valve_state_VRD2", "valve_state_VPD", "valve_states_enum" ]) def _handler_valves_sync(self, message, timestamp=0.0): """ Handles published valves states message. @param message: published valves states message @return: none """ vst = struct.unpack('H', bytearray(message['message'][self.START_POS_VALVES_STATES:self.END_POS_VALVES_STATES])) self.valve_states_all = vst[0] # Extract each valve state from U16 valves states using bit-masking self.valve_state_VRF["state"] = self._binary_to_valve_state(vst[0] & 1) self.valve_state_VRI["state"] = self._binary_to_valve_state(vst[0] & 2) self.valve_state_VRO["state"] = self._binary_to_valve_state(vst[0] & 8) self.valve_state_VPO["state"] = self._binary_to_valve_state(vst[0] & 16) self.valve_state_VBF["state"] = self._binary_to_valve_state(vst[0] & 32) self.valve_state_VRC["state"] = self._binary_to_valve_state(vst[0] & 64) self.valve_state_VDR["state"] = self._binary_to_valve_state(vst[0] & 128) self.valve_state_VPI["state"] = self._binary_to_valve_state(vst[0] & 256) self.valve_state_VSP["state"] = self._binary_to_valve_state(vst[0] & 512) self.valve_state_VRD1["state"] = self._binary_to_valve_state(vst[0] & 1024) self.valve_state_VRD2["state"] = self._binary_to_valve_state(vst[0] & 2048) self.valve_state_VPD["state"] = self._binary_to_valve_state(vst[0] & 4096) self.valve_states_enum[DDValveNames.VALVE_RESERVOIR_FILL.value] = VRdVRfStates(self._binary_to_valve_state(vst[0] & 1)).name # VRF self.valve_states_enum[DDValveNames.VALVE_RESERVOIR_INLET.value] = VRoVRiStates(self._binary_to_valve_state(vst[0] & 2)).name # VRI self.valve_states_enum[DDValveNames.VALVE_RESERVOIR_OUTLET.value] = VRoVRiStates(self._binary_to_valve_state(vst[0] & 8)).name # VRO self.valve_states_enum[DDValveNames.VALVE_PRESSURE_OUTLET.value] = VPoStates(self._binary_to_valve_state(vst[0] & 16)).name # VPO self.valve_states_enum[DDValveNames.VALVE_BYPASS_FILTER.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 32)).name # VBF self.valve_states_enum[DDValveNames.VALVE_RECIRCULATE.value] = VDrVRcStates(self._binary_to_valve_state(vst[0] & 64)).name # VRC self.valve_states_enum[DDValveNames.VALVE_DRAIN.value] = VDrVRcStates(self._binary_to_valve_state(vst[0] & 128)).name # VDR self.valve_states_enum[DDValveNames.VALVE_PRESSURE_INLET.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 256)).name # VPI self.valve_states_enum[DDValveNames.VALVE_SAMPLING_PORT.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 512)).name # VSP self.valve_states_enum[DDValveNames.VALVE_RESERVOIR_DRAIN_1.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 1024)).name # VRD1 self.valve_states_enum[DDValveNames.VALVE_RESERVOIR_DRAIN_2.value] = VPiVSPVBfVRD1VRD2States(self._binary_to_valve_state(vst[0] & 2048)).name # VRD2 self.valve_states_enum[DDValveNames.VALVE_PRODUCTION_DRAIN.value] = VPdStates(self._binary_to_valve_state(vst[0] & 4096)).name # VPD start = self.END_POS_VALVES_STATES end = start + 1 for valve_id in self.valves_sensed_states: valve_state_number = struct.unpack('B', bytearray(message['message'][start:end]))[0] self.valves_sensed_states[valve_id] = DGValvesSensedStates(valve_state_number).name start = end end += 1 self.dg_valves_states_timestamp = timestamp def cmd_valve_sensed_state_override(self, valve: int, state: bool, reset: int = NO_RESET) -> int: """ Constructs and sends the valve sensed state override command. Constraints: Must be logged into DG. Given valve ID must be one of the valve IDs listed below. @param valve: unsigned int - valve ID @param state: bool - valve state @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise valve IDs: \n 0 = Valve Reservoir Fill \n 1 = Valve Reservoir Inlet \n 2 = Reserved Space \n 3 = Valve Reservoir Outlet \n 4 = Valve Pressure Outlet \n 5 = Valve Bypass Filter \n 6 = Valve Recirculate \n 7 = Valve Drain \n 8 = Valve Pressure Inlet \n 9 = Valve Sampling Port \n 10 = Valve Reservoir 1 Drain \n 11 = Valve Reservoir 2 Drain \n 12 = Valve Production Drain \n """ rst = integer_to_bytearray(reset) ste = integer_to_bytearray(int(state)) vlv = integer_to_bytearray(valve) payload = rst + ste + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_VALVE_SENSED_STATE_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("Override valve sensed state") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_valve_override(self, valve: int, state: bool, reset: int = NO_RESET) -> int: """ Constructs and sends the valve state override command. Constraints: Must be logged into DG. Given valve ID must be one of the valve IDs listed below. @param valve: unsigned int - valve ID @param state: bool - valve state @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise valve IDs: \n 0 = Valve Reservoir Fill \n 1 = Valve Reservoir Inlet \n 2 = Reserved Space \n 3 = Valve Reservoir Outlet \n 4 = Valve Pressure Outlet \n 5 = Valve Bypass Filter \n 6 = Valve Recirculate \n 7 = Valve Drain \n 8 = Valve Pressure Inlet \n 9 = Valve Sampling Port \n 10 = Valve Reservoir 1 Drain \n 11 = Valve Reservoir 2 Drain \n 12 = Valve Production Drain \n """ rst = integer_to_bytearray(reset) ste = integer_to_bytearray(int(state)) vlv = integer_to_bytearray(valve) payload = rst + ste + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_VALVE_STATE_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("Override valve state") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_valve_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the valve state override command. Constraints: Must be logged into DG. Given interval must be non-zero and a multiple of the DG general task interval (50 ms). @param ms: unsigned int - broadcast interval (in ms) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) ivl = integer_to_bytearray(ms) payload = rst + ivl message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_VALVE_PUBLISH_INTERVAL_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("override valves states publish interval") # Send message received_message = self.can_interface.send(message) # If there is content in message if received_message is not None: # Response payload is OK or not return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False