########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file valves.py # # @author (last) Quang Nguyen # @date (last) 05-Aug-2021 # @author (original) Dara Navaei # @date (original) 19-Aug-2020 # ############################################################################ import struct from enum import unique from logging import Logger from .constants import NO_RESET from ..common import MsgIds from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray, float_to_bytearray @unique class ValvesEnum(DialinEnum): VDI = 0 VDO = 1 VBA = 2 VBV = 3 @unique class ValvesPositions(DialinEnum): VALVE_POSITION_A_INSERT_EJECT = 1 VALVE_POSITION_B_OPEN = 2 VALVE_POSITION_C_CLOSE = 3 @unique class ValvesStates(DialinEnum): VALVE_STATE_WAIT_FOR_POST = 0 VALVE_STATE_HOMING_NOT_STARTED = 1 VALVE_STATE_HOMING_FIND_ENERGIZED_EDGE = 2 VALVE_STATE_HOMING_FIND_DEENERGIZED_EDGE = 3 VALVE_STATE_IDLE = 4 VALVE_STATE_IN_TRANSITION = 5 VALVE_STATE_IN_BYPASS_MODE = 6 @unique class AirTrapState(DialinEnum): STATE_CLOSED = 0 STATE_OPEN = 1 class HDValves(AbstractSubSystem): """ Hemodialysis Device (HD) Dialin API sub-class for valves related commands. """ # Valves states publish message field positions # Note the MsgFieldPosition was not used since some of the published data are S16 START_POS_VALVES_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_VALVES_ID = START_POS_VALVES_ID + 4 START_VALVES_STATE = END_POS_VALVES_ID END_VALVES_STATE = START_VALVES_STATE + 4 START_POS_VALVES_CURR_POS = END_VALVES_STATE END_POS_VALVES_CURR_POS = START_POS_VALVES_CURR_POS + 4 START_POS_VALVES_CURR_POS_CNT = END_POS_VALVES_CURR_POS END_POS_VALVES_CURR_POS_CNT = START_POS_VALVES_CURR_POS_CNT + 2 START_POS_VALVES_NEXT_POS_CNT = END_POS_VALVES_CURR_POS_CNT END_POS_VALVES_NEXT_POS_CNT = START_POS_VALVES_NEXT_POS_CNT + 2 START_POS_VALVES_CURRENT = END_POS_VALVES_NEXT_POS_CNT END_POS_VALVES_CURRENT = START_POS_VALVES_CURRENT + 4 START_VALVES_POS_C = END_POS_VALVES_CURRENT END_VALVES_POS_C = START_VALVES_POS_C + 2 START_VALVES_POS_A = END_VALVES_POS_C END_VALVES_POS_A = START_VALVES_POS_A + 2 START_VALVES_POS_B = END_VALVES_POS_A END_VALVES_POS_B = START_VALVES_POS_B + 2 START_VALVES_PWM = END_VALVES_POS_B END_VALVES_PWM = START_VALVES_PWM + 4 START_AIR_TRAP_VALVE_STATUS = END_VALVES_PWM END_AIR_TRAP_VALVE_STATUS = START_AIR_TRAP_VALVE_STATUS + 4 def __init__(self, can_interface, logger: Logger): """ DGDrainPump constructor @param can_interface: (DenaliCanMessenger) - Denali CAN messenger object. @param logger: (Logger) - Dialin logger """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_VALVES_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_hd_valves_sync) # A dictionary of the valves with the status self.valves_status = {ValvesEnum.VDI.name: {}, ValvesEnum.VDO.name: {}, ValvesEnum.VBA.name: {}, ValvesEnum.VBV.name: {}} self.hd_air_trap_status = 0 def get_hd_air_trap_status(self): """ Returns the hd air trap status @return: (str) the HD air trap status """ return self.hd_air_trap_status def get_hd_valves_status(self): """ Gets the hd valves status @return: (dict) the hd valves status """ return self.valves_status def cmd_hd_valves_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends broadcast time interval Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: Publish time interval in ms @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False reset_value = integer_to_bytearray(reset) interval_value = integer_to_bytearray(ms) payload = reset_value + interval_value message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_VALVES_STATES_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("Sending {} ms publish interval to the HD valves module".format(ms)) # Send message received_message = self.can_interface.send(message) # If there is content in message if received_message is not None: # Response payload is OK or not return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_set_hd_valve_position(self, valve: int, position: int, reset: int = NO_RESET) -> int: """ Constructs and sends the HD valves set position for a valve @param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 @param position: integer - Position number: VALVE_POSITION_A_INSERT_EJECT = 1 VALVE_POSITION_B_OPEN = 2 VALVE_POSITION_C_CLOSE = 3 @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position) payload = reset_value + pos + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_POSITION_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD cmd_valve_override Timeout!!!") return False def cmd_set_hd_valve_current_override(self, valve: int, current: float, reset: int = NO_RESET) -> int: """ Constructs and sends the HD valves set position for a valve @param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 @param current: float value to override current @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) cur = float_to_bytearray(current) payload = reset_value + cur + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_CURRENT_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Setting {} current to {:5.3f} A".format(str(ValvesEnum(valve).name), current)) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD current override Timeout!!!") return False def cmd_set_hd_valve_position_count_override(self, valve: int, position_count: int, reset: int = NO_RESET) -> int: """ Constructs and sends the HD valves set position for a valve @param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 @param position_count: integer value @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position_count) payload = reset_value + pos + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_POSITION_COUNT_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Setting {} position to {} ".format(str(ValvesEnum(valve).name), position_count)) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD current override Timeout!!!") return False def cmd_set_hd_valve_pwm(self, valve: int, pwm: int, direction: int, reset: int = NO_RESET) -> int: """ Constructs and sends the HD valves PWM command @param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 @param pwm: integer - sets the pwm value @param direction: integer - Direction number: 0 = Clockwise 1 = Counter clockwise @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pwm = integer_to_bytearray(pwm) dir_value = integer_to_bytearray(direction) payload = reset_value + vlv + pwm + dir_value message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_SET_PWM_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD cmd_valve_override Timeout!!!") return False def cmd_home_hd_valve(self, valve: int) -> int: """ Constructs and sends the HD valves home command @param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 @returns 1 if successful, zero otherwise """ payload = integer_to_bytearray(valve) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_HOME.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD Homing Valve Timeout!!!") return False def cmd_set_hd_air_trap_valve(self, valve_state: int = AirTrapState.STATE_CLOSED.name) -> int: """ Constructs and sends an open/close command to the HD air trap valve @param valve_state: air trap valve state (open or close) @returns 1 if successful, zero otherwise """ if valve_state == AirTrapState.STATE_OPEN.value: payload = integer_to_bytearray(1) else: payload = integer_to_bytearray(0) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_SET_AIR_TRAP_VALVE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Opening air trap valve") # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Opening air trap valve timeout!!") return False @publish(["valves_status", "hd_air_trap_status"]) def _handler_hd_valves_sync(self, message: dict) -> None: """ Handles published HD valves data messages. HD valves data are captured for reference. @param message: published HD valves data message @returns none """ vlv_id = struct.unpack('i', bytearray( message['message'][self.START_POS_VALVES_ID:self.END_POS_VALVES_ID]))[0] state_id = struct.unpack('i', bytearray( message['message'][self.START_VALVES_STATE:self.END_VALVES_STATE]))[0] pos_id = struct.unpack('i', bytearray( message['message'][self.START_POS_VALVES_CURR_POS:self.END_POS_VALVES_CURR_POS]))[0] pos_cnt = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_CURR_POS_CNT:self.END_POS_VALVES_CURR_POS_CNT]))[0] next_pos = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_NEXT_POS_CNT:self.END_POS_VALVES_NEXT_POS_CNT]))[0] current = struct.unpack('f', bytearray( message['message'][self.START_POS_VALVES_CURRENT:self.END_POS_VALVES_CURRENT]))[0] pos_c = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_C:self.END_VALVES_POS_C]))[0] pos_a = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_A:self.END_VALVES_POS_A]))[0] pos_b = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_B:self.END_VALVES_POS_B]))[0] pwm = struct.unpack('i', bytearray( message['message'][self.START_VALVES_PWM:self.END_VALVES_PWM]))[0] air_trap = struct.unpack('i', bytearray( message['message'][self.START_AIR_TRAP_VALVE_STATUS:self.END_AIR_TRAP_VALVE_STATUS]))[0] # To make sure values of the enums are not out of range if ValvesEnum.has_value(vlv_id) and ValvesPositions.has_value(pos_id) and ValvesStates.has_value(pos_id): vlv_name = ValvesEnum(vlv_id).name # Update the valves dictionary self.valves_status[vlv_name] = {'Valve': vlv_name, 'PosID': ValvesPositions(pos_id).name, 'PosCnt': pos_cnt, 'Cmd': next_pos, 'State': ValvesStates(state_id).name, 'Current': current, 'PosA': pos_a, 'PosB': pos_b, 'PosC': pos_c, 'PWM': pwm} # Update the air trap valve's status self.hd_air_trap_status = air_trap