import struct from ..utils.conversions import integer_to_bytearray from .constants import NO_RESET from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish from ..common import MsgIds from logging import Logger from ..utils.base import DialinEnum class ValvesEnum(DialinEnum): VDI = 0 VDO = 1 VBA = 2 VBV = 3 class ValvesPositions(DialinEnum): VALVE_POSITION_NOT_IN_POSITION = 0 VALVE_POSITION_A_INSERT_EJECT = 1 VALVE_POSITION_B_OPEN = 2 VALVE_POSITION_C_CLOSE = 3 class ValvesStates(DialinEnum): VALVE_STATE_WAIT_FOR_POST = 0 VALVE_STATE_HOMING_NOT_STARTED = 1 VALVE_STATE_HOMING_FIND_ENERGIZED_EDGE = 2 VALVE_STATE_HOMING_FIND_DEENERGIZED_EDGE = 3 VALVE_STATE_IDLE = 4 VALVE_STATE_IN_TRANSITION = 5 VALVE_STATE_IN_BYPASS_MODE = 6 class AirTrapState(DialinEnum): STATE_CLOSED = 0 STATE_OPEN = 1 class HDValves(_AbstractSubSystem): """ \class HDValves \brief Hemodialysis Device (HD) Dialin API sub-class for valves related commands. """ # Valves states publish message field positions START_POS_VALVES_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_VALVES_ID = START_POS_VALVES_ID + 4 START_VALVES_STATE = END_POS_VALVES_ID END_VALVES_STATE = START_VALVES_STATE + 4 START_POS_VALVES_CURR_POS = END_VALVES_STATE END_POS_VALVES_CURR_POS = START_POS_VALVES_CURR_POS + 4 START_POS_VALVES_CURR_POS_CNT = END_POS_VALVES_CURR_POS END_POS_VALVES_CURR_POS_CNT = START_POS_VALVES_CURR_POS_CNT + 2 START_POS_VALVES_NEXT_POS_CNT = END_POS_VALVES_CURR_POS_CNT END_POS_VALVES_NEXT_POS_CNT = START_POS_VALVES_NEXT_POS_CNT + 2 START_POS_VALVES_CURRENT = END_POS_VALVES_NEXT_POS_CNT END_POS_VALVES_CURRENT = START_POS_VALVES_CURRENT + 4 START_VALVES_POS_C = END_POS_VALVES_CURRENT END_VALVES_POS_C = START_VALVES_POS_C + 2 START_VALVES_POS_A = END_VALVES_POS_C END_VALVES_POS_A = START_VALVES_POS_A + 2 START_VALVES_POS_B = END_VALVES_POS_A END_VALVES_POS_B = START_VALVES_POS_B + 2 START_VALVES_PWM = END_VALVES_POS_B END_VALVES_PWM = START_VALVES_PWM + 4 START_AIR_TRAP_VALVE_STATUS = END_VALVES_PWM END_AIR_TRAP_VALVE_STATUS = START_AIR_TRAP_VALVE_STATUS + 4 def __init__(self, can_interface, logger: Logger): """ DGDrainPump constructor \param outer_instance: reference to the DG (outer) class. """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_VALVES_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_hd_valves_sync) # A dictionary of the valves with the status self.valves_status = {ValvesEnum.VDI.name:{}, ValvesEnum.VDO.name:{}, ValvesEnum.VBA.name:{}, ValvesEnum.VBV.name:{}} self.hd_air_trap_status = 0 def cmd_hd_valves_broadcast_interval_override(self, ms, reset=NO_RESET): """ Constructs and sends broadcast time interval @param ms: Publish time interval in ms @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) interval_value = integer_to_bytearray(ms) payload = reset_value + interval_value message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_VALVES_STATES_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("Sending {} ms publish interval to the HD valves module".format(ms)) # Send message received_message = self.can_interface.send(message) # If there is content in message if received_message is not None: # Response payload is OK or not return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_set_hd_valve_position(self, valve, position, reset=NO_RESET): """ Constructs and sends the HD valves set position for a valve \param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 \param position: integer - Position number: VALVE_POSITION_A_INSERT_EJECT = 1 VALVE_POSITION_B_OPEN = 2 VALVE_POSITION_C_CLOSE = 3 \returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position) payload = reset_value + pos + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_POSITION_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD cmd_valve_override Timeout!!!") return False def cmd_set_hd_valve_pwm(self, valve, pwm, direction, reset=NO_RESET): """ Constructs and sends the HD valves PWM command \param valve: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 \param direction: integer - Direction number: 0 = Clockwise 1 = Counter clockwise \returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pwm = integer_to_bytearray(pwm) dir = integer_to_bytearray(direction) payload = reset_value + vlv + pwm + dir message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_SET_PWM_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD cmd_valve_override Timeout!!!") return False def cmd_home_hd_valve(self, valve): """ Constructs and sends the HD valves home command \param vavle: integer - Valve number: VDI = 0 VDO = 1 VBA = 2 VBV = 3 \returns 1 if successful, zero otherwise """ payload = integer_to_bytearray(valve) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_HOME.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("HD Homing Valve Timeout!!!") return False def cmd_set_hd_air_trap_valve(self, valve_state=AirTrapState.STATE_CLOSED.name): """ Constructs and sends an open/close command to the HD air trap valve \returns 1 if successful, zero otherwise """ if valve_state == AirTrapState.STATE_OPEN: payload = integer_to_bytearray(1) else: payload = integer_to_bytearray(0) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_VALVES_SET_AIR_TRAP_VALVE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Opening air trap valve") # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Opening air trap valve timeout!!") return False @_publish(["valves_status"]) def _handler_hd_valves_sync(self, message): """ Handles published HD valves data messages. HD valves data are captured for reference. \param message: published HD valves data message \returns none """ vlv_ID = struct.unpack('i', bytearray( message['message'][self.START_POS_VALVES_ID:self.END_POS_VALVES_ID]))[0] state_ID = struct.unpack('i', bytearray( message['message'][self.START_VALVES_STATE:self.END_VALVES_STATE]))[0] pos_ID = struct.unpack('i', bytearray( message['message'][self.START_POS_VALVES_CURR_POS:self.END_POS_VALVES_CURR_POS]))[0] pos_cnt = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_CURR_POS_CNT:self.END_POS_VALVES_CURR_POS_CNT]))[0] next_pos = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_NEXT_POS_CNT:self.END_POS_VALVES_NEXT_POS_CNT]))[0] current = struct.unpack('f', bytearray( message['message'][self.START_POS_VALVES_CURRENT:self.END_POS_VALVES_CURRENT]))[0] pos_c = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_C:self.END_VALVES_POS_C]))[0] pos_a = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_A:self.END_VALVES_POS_A]))[0] pos_b = struct.unpack('h', bytearray( message['message'][self.START_VALVES_POS_B:self.END_VALVES_POS_B]))[0] pwm = struct.unpack('i', bytearray( message['message'][self.START_VALVES_PWM:self.END_VALVES_PWM]))[0] air_trap = struct.unpack('i', bytearray( message['message'][self.START_AIR_TRAP_VALVE_STATUS:self.END_AIR_TRAP_VALVE_STATUS]))[0] # To make sure values of the enums are not out of range if ValvesEnum.has_value(vlv_ID) and ValvesPositions.has_value(pos_ID) and ValvesStates.has_value(pos_ID): vlv_name = ValvesEnum(vlv_ID).name # Update the valves dictionary self.valves_status[vlv_name] = {'Valve': vlv_name, 'PosID': ValvesPositions(pos_ID).name, 'PosCnt': pos_cnt, 'Cmd': next_pos, 'State': ValvesStates(state_ID).name, 'Current': current, 'PosA': pos_a, 'PosB': pos_b, 'PosC': pos_c, 'PWM': pwm} if AirTrapState.has_value(air_trap): self.hd_air_trap_status = AirTrapState(air_trap).name