########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file valves.py # # @author (last) James Walter Taylor # @date (last) 03-Aug-2023 # @author (original) Dara Navaei # @date (original) 19-Aug-2020 # ############################################################################ import struct from enum import unique from logging import Logger from .constants import NO_RESET from leahi_dialin.common import MsgIds from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish, DialinEnum from leahi_dialin.utils.checks import check_broadcast_interval_override_ms from leahi_dialin.utils.conversions import integer_to_bytearray @unique class ValvesEnum(DialinEnum): VBA = 0 VBV = 1 @unique class ValvesPositions(DialinEnum): VALVE_POSITION_NOT_IN_POSITION = 0 VALVE_POSITION_A_INSERT_EJECT = 1 VALVE_POSITION_B_OPEN = 2 VALVE_POSITION_C_CLOSE = 3 @unique class ValvesStates(DialinEnum): VALVE_STATE_WAIT_FOR_POST = 0 VALVE_STATE_HOMING_NOT_STARTED = 1 VALVE_STATE_HOMING_FIND_ENERGIZED_EDGE = 2 VALVE_STATE_HOMING_FIND_DEENERGIZED_EDGE = 3 VALVE_STATE_IDLE = 4 VALVE_STATE_IN_TRANSITION = 5 class TDValves(AbstractSubSystem): """ Treatment Delivery (TD) Dialin API sub-class for valves related commands. """ # Valves states publish message field positions # Note the MsgFieldPosition was not used since some of the published data are S16 START_POS_VALVES_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_VALVES_ID = START_POS_VALVES_ID + 4 START_VALVES_STATE = END_POS_VALVES_ID END_VALVES_STATE = START_VALVES_STATE + 4 START_POS_VALVES_CURR_POS_ID = END_VALVES_STATE END_POS_VALVES_CURR_POS_ID = START_POS_VALVES_CURR_POS_ID + 4 START_POS_VALVES_CURR_POS = END_POS_VALVES_CURR_POS_ID END_POS_VALVES_CURR_POS = START_POS_VALVES_CURR_POS + 2 START_POS_VALVES_NEXT_POS = END_POS_VALVES_CURR_POS END_POS_VALVES_NEXT_POS = START_POS_VALVES_NEXT_POS + 2 def __init__(self, can_interface, logger: Logger): """ TDValves constructor @param can_interface: (DenaliCanMessenger) - Denali CAN messenger object. @param logger: (Logger) - Dialin logger """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.td_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_TD_VALVES_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_valves_sync) self.td_valves_timestamp = 0.0 # A dictionary of the valves with the status self.valves_status = {ValvesEnum.VBA.name: {}, ValvesEnum.VBV.name: {}} @publish(["td_valves_timestamp","valves_status"]) def _handler_valves_sync(self, message: dict, timestamp=0.0) -> None: """ Handles published valves data messages. TD valves data are captured for reference. @param message: published TD valves data message @returns none """ vlv_id = struct.unpack('i', bytearray( message['message'][self.START_POS_VALVES_ID:self.END_POS_VALVES_ID]))[0] state_id = struct.unpack('i', bytearray( message['message'][self.START_VALVES_STATE:self.END_VALVES_STATE]))[0] pos_id = struct.unpack('i', bytearray( message['message'][self.START_POS_VALVES_CURR_POS_ID:self.END_POS_VALVES_CURR_POS_ID]))[0] pos_cnt = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_CURR_POS:self.END_POS_VALVES_CURR_POS]))[0] next_pos = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_NEXT_POS:self.END_POS_VALVES_NEXT_POS]))[0] # To make sure values of the enums are not out of range if ValvesEnum.has_value(vlv_id) and ValvesPositions.has_value(pos_id) and ValvesStates.has_value(pos_id): vlv_name = ValvesEnum(vlv_id).name # Update the valves dictionary self.valves_status[vlv_name] = {'Valve': vlv_name, 'PosID': ValvesPositions(pos_id).name, 'PosCnt': pos_cnt, 'Cmd': next_pos, 'State': ValvesStates(state_id).name} self.td_valves_timestamp = timestamp def cmd_valves_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends broadcast time interval Constraints: Must be logged into TD. Given interval must be non-zero and a multiple of the TD general task interval (50 ms). @param ms: Publish time interval in ms @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False reset_value = integer_to_bytearray(reset) interval_value = integer_to_bytearray(ms) payload = reset_value + interval_value message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_VALVES_PUBLISH_INTERVAL_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("Sending {} ms publish interval to the HD valves module".format(ms)) # Send message received_message = self.can_interface.send(message) # If there is content in message if received_message is not None: # Response payload is OK or not return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_set_valve_position(self, valve: int, position: int, reset: int = NO_RESET) -> int: """ Constructs and sends the TD valves set position for a valve @param valve: integer - Valve number. Defined in ValvesEnum class @param position: integer - Position number: Defined in ValvesPositions class @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position) payload = reset_value + pos + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_PINCH_VALVE_SET_POSITION_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("TD cmd_valve_override Timeout!!!") return False def cmd_home_valve(self, valve: int) -> int: """ Constructs and sends the TD valves home command @param valve: integer - Valve number. Defined in ValvesEnum class @returns 1 if successful, zero otherwise """ payload = integer_to_bytearray(valve) message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_PINCH_VALVE_HOME_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("TD Homing Valve Timeout!!!") return False def cmd_valve_encoder_position_override(self, valve: int, position_count: int, reset: int = NO_RESET) -> int: """ Constructs and sends the TD valves set position for a valve @@param valve: integer - Valve number. Defined in ValvesEnum class @param position_count: integer value @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position_count) payload = reset_value + pos + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_POSITION_OVERRIDE_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Setting {} position to {} ".format(str(ValvesEnum(valve).name), position_count)) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("TD valve position override Timeout!!!") return False def cmd_valve_status_override(self, valve: int, status: int, reset: int = NO_RESET) -> int: """ Constructs and sends the TD valves set position for a valve @@param valve: integer - Valve number. Defined in ValvesEnum class @param status: integer value @param status: integer value @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) sts = integer_to_bytearray(status) payload = reset_value + sts + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_STATUS_OVERRIDE_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Setting {} status to {} ".format(str(ValvesEnum(valve).name), status)) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("TD valve status override Timeout!!!") return False def cmd_valve_modify_encoder_position_by_offset(self, valve: int, counts: int) -> int: """ Constructs and sends a given valve to change position by a given number of encoder counts. @@param valve: integer - Valve number. Defined in ValvesEnum class @param count: integer value @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(counts) payload = pos + vlv message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_POSITION_OVERRIDE_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Setting {} position by {} ".format(str(ValvesEnum(valve).name), counts)) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("TD valve position override Timeout!!!") return False