########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file valves.py # # @author (last) Peter Lucia # @date (last) 10-Nov-2020 # @author (original) Dara Navaei # @date (original) 19-Aug-2020 # ############################################################################ import struct from ..utils.conversions import integer_to_bytearray from .constants import NO_RESET from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish, DialinEnum from ..common import MsgIds from logging import Logger from enum import unique @unique class ValvesEnum(DialinEnum): VDI = 0 VDO = 1 VBA = 2 VBV = 3 @unique class ValvesPositions(DialinEnum): VALVE_POSITION_NOT_IN_POSITION = 0 VALVE_POSITION_A_INSERT_EJECT = 1 VALVE_POSITION_B_OPEN = 2 VALVE_POSITION_C_CLOSE = 3 @unique class ValvesStates(DialinEnum): VALVE_STATE_WAIT_FOR_POST = 0 VALVE_STATE_HOMING_NOT_STARTED = 1 VALVE_STATE_HOMING_FIND_ENERGIZED_EDGE = 2 VALVE_STATE_HOMING_FIND_DEENERGIZED_EDGE = 3 VALVE_STATE_IDLE = 4 VALVE_STATE_IN_TRANSITION = 5 VALVE_STATE_IN_BYPASS_MODE = 6 @unique class AirTrapState(DialinEnum): STATE_CLOSED = 0 STATE_OPEN = 1 class HDValves(_AbstractSubSystem): """ \class HDValves @brief Hemodialysis Device (HD) Dialin API sub-class for valves related commands. """ # Valves states publish message field positions # Note the MsgFieldPosition was not used since some of the published data are S16 START_POS_VALVES_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_VALVES_ID = START_POS_VALVES_ID + 4 START_VALVES_STATE = END_POS_VALVES_ID END_VALVES_STATE = START_VALVES_STATE + 4 START_POS_VALVES_CURR_POS = END_VALVES_STATE END_POS_VALVES_CURR_POS = START_POS_VALVES_CURR_POS + 4 START_POS_VALVES_CURR_POS_CNT = END_POS_VALVES_CURR_POS END_POS_VALVES_CURR_POS_CNT = START_POS_VALVES_CURR_POS_CNT + 2 START_POS_VALVES_NEXT_POS_CNT = END_POS_VALVES_CURR_POS_CNT END_POS_VALVES_NEXT_POS_CNT = START_POS_VALVES_NEXT_POS_CNT + 2 START_POS_VALVES_CURRENT = END_POS_VALVES_NEXT_POS_CNT END_POS_VALVES_CURRENT = START_POS_VALVES_CURRENT + 4 START_VALVES_POS_C = END_POS_VALVES_CURRENT END_VALVES_POS_C = START_VALVES_POS_C + 2 START_VALVES_POS_A = END_VALVES_POS_C END_VALVES_POS_A = START_VALVES_POS_A + 2 START_VALVES_POS_B = END_VALVES_POS_A END_VALVES_POS_B = START_VALVES_POS_B + 2 START_VALVES_PWM = END_VALVES_POS_B END_VALVES_PWM = START_VALVES_PWM + 4 START_AIR_TRAP_VALVE_STATUS = END_VALVES_PWM END_AIR_TRAP_VALVE_STATUS = START_AIR_TRAP_VALVE_STATUS + 4 def __init__(self, can_interface, logger: Logger): """ DGDrainPump constructor @param can_interface: (DenaliCanMessenger) - Denali CAN messenger object. @param logger: (Logger) - Dialin logger """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_VALVES_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_hd_valves_sync) # A dictionary of the valves with the status self.valves_status = {ValvesEnum.VDI.name:{}, ValvesEnum.VDO.name:{}, ValvesEnum.VBA.name:{}, ValvesEnum.VBV.name:{}} self.hd_air_trap_status = 0 def get_hd_air_trap_status(self): """ Returns the hd air trap status @return: (str) the HD air trap status """ return self.hd_air_trap_status def get_hd_valves_status(self): """ Gets the hd valves status @return: (dict) the hd valves status """ return self.valves_status def cmd_hd_valves_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends broadcast time interval @param ms: Publish time interval in ms @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) interval_value = integer_to_bytearray(ms) payload = reset_value + interval_value message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_VALVES_STATES_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("Sending {} ms publish interval to the HD valves module".format(ms)) # Send message received_message = self.can_interface.send(message) # If there is content in message if received_message is not None: # Response payload is OK or not return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_set_hd_valve_position(self, valve: int, position: int, reset: int = NO_RESET) -> int: """ Constructs and sends the HD valves set position for a valve @param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 @param position: integer - Position number: VALVE_POSITION_A_INSERT_EJECT = 1 VALVE_POSITION_B_OPEN = 2 VALVE_POSITION_C_CLOSE = 3 @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position) payload = reset_value + pos + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_POSITION_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD cmd_valve_override Timeout!!!") return False def cmd_set_hd_valve_pwm(self, valve: int, pwm: int, direction: int, reset: int = NO_RESET) -> int: """ Constructs and sends the HD valves PWM command @param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 @pwm pwm: integer - sets the pwm value @param direction: integer - Direction number: 0 = Clockwise 1 = Counter clockwise @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pwm = integer_to_bytearray(pwm) dir = integer_to_bytearray(direction) payload = reset_value + vlv + pwm + dir message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_SET_PWM_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD cmd_valve_override Timeout!!!") return False def cmd_home_hd_valve(self, valve: int) -> int: """ Constructs and sends the HD valves home command @param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 @returns 1 if successful, zero otherwise """ payload = integer_to_bytearray(valve) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_HOME.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD Homing Valve Timeout!!!") return False def cmd_set_hd_air_trap_valve(self, valve_state: int = AirTrapState.STATE_CLOSED.name) -> int: """ Constructs and sends an open/close command to the HD air trap valve @param valve_state: air trap valve state (open or close) @returns 1 if successful, zero otherwise """ if valve_state == AirTrapState.STATE_OPEN: payload = integer_to_bytearray(1) else: payload = integer_to_bytearray(0) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_SET_AIR_TRAP_VALVE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Opening air trap valve") # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Opening air trap valve timeout!!") return False @_publish(["valves_status"]) def _handler_hd_valves_sync(self, message): """ Handles published HD valves data messages. HD valves data are captured for reference. @param message: published HD valves data message @returns none """ vlv_ID = struct.unpack('i', bytearray( message['message'][self.START_POS_VALVES_ID:self.END_POS_VALVES_ID]))[0] state_ID = struct.unpack('i', bytearray( message['message'][self.START_VALVES_STATE:self.END_VALVES_STATE]))[0] pos_ID = struct.unpack('i', bytearray( message['message'][self.START_POS_VALVES_CURR_POS:self.END_POS_VALVES_CURR_POS]))[0] pos_cnt = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_CURR_POS_CNT:self.END_POS_VALVES_CURR_POS_CNT]))[0] next_pos = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_NEXT_POS_CNT:self.END_POS_VALVES_NEXT_POS_CNT]))[0] current = struct.unpack('f', bytearray( message['message'][self.START_POS_VALVES_CURRENT:self.END_POS_VALVES_CURRENT]))[0] pos_c = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_C:self.END_VALVES_POS_C]))[0] pos_a = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_A:self.END_VALVES_POS_A]))[0] pos_b = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_B:self.END_VALVES_POS_B]))[0] pwm = struct.unpack('i', bytearray( message['message'][self.START_VALVES_PWM:self.END_VALVES_PWM]))[0] air_trap = struct.unpack('i', bytearray( message['message'][self.START_AIR_TRAP_VALVE_STATUS:self.END_AIR_TRAP_VALVE_STATUS]))[0] # To make sure values of the enums are not out of range if ValvesEnum.has_value(vlv_ID) and ValvesPositions.has_value(pos_ID) and ValvesStates.has_value(pos_ID): vlv_name = ValvesEnum(vlv_ID).name # Update the valves dictionary self.valves_status[vlv_name] = {'Valve': vlv_name, 'PosID': ValvesPositions(pos_ID).name, 'PosCnt': pos_cnt, 'Cmd': next_pos, 'State': ValvesStates(state_ID).name, 'Current': current, 'PosA': pos_a, 'PosB': pos_b, 'PosC': pos_c, 'PWM': pwm} if AirTrapState.has_value(air_trap): self.hd_air_trap_status = AirTrapState(air_trap).name