Index: leahi_dialin/td/modules/valves.py =================================================================== diff -u -r71216b08590dd4b556c10b3384dddc26c8d29a99 -r38e8dd31728056dbb7f9304c98ab16d7147b75a2 --- leahi_dialin/td/modules/valves.py (.../valves.py) (revision 71216b08590dd4b556c10b3384dddc26c8d29a99) +++ leahi_dialin/td/modules/valves.py (.../valves.py) (revision 38e8dd31728056dbb7f9304c98ab16d7147b75a2) @@ -7,52 +7,27 @@ # # @file valves.py # -# @author (last) James Walter Taylor -# @date (last) 03-Aug-2023 +# @author (last) Zoltan Miskolci +# @date (last) 08-Jan-2026 # @author (original) Dara Navaei # @date (original) 19-Aug-2020 # ############################################################################ - import struct -from enum import unique from logging import Logger -from .constants import NO_RESET -from leahi_dialin.common import MsgIds +from leahi_dialin.common.constants import NO_RESET +from leahi_dialin.common.msg_defs import MsgIds +from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override +from leahi_dialin.common.td_defs import td_enum_repository from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels -from leahi_dialin.utils.base import AbstractSubSystem, publish, DialinEnum -from leahi_dialin.utils.checks import check_broadcast_interval_override_ms +from leahi_dialin.utils.base import AbstractSubSystem, publish from leahi_dialin.utils.conversions import integer_to_bytearray -@unique -class ValvesEnum(DialinEnum): - H1_VALV = 0 - H19_VALV = 1 - - -@unique -class ValvesPositions(DialinEnum): - VALVE_POSITION_NOT_IN_POSITION = 0 - VALVE_POSITION_A_INSERT_EJECT = 1 - VALVE_POSITION_B_OPEN = 2 - VALVE_POSITION_C_CLOSE = 3 - - -@unique -class ValvesStates(DialinEnum): - VALVE_STATE_WAIT_FOR_POST = 0 - VALVE_STATE_HOMING_NOT_STARTED = 1 - VALVE_STATE_HOMING_FIND_ENERGIZED_EDGE = 2 - VALVE_STATE_HOMING_FIND_DEENERGIZED_EDGE = 3 - VALVE_STATE_IDLE = 4 - VALVE_STATE_IN_TRANSITION = 5 - class TDValves(AbstractSubSystem): """ - Treatment Delivery (TD) Dialin API sub-class for valves related commands. """ # Valves states publish message field positions @@ -72,6 +47,16 @@ START_POS_VALVES_NEXT_POS = END_POS_VALVES_CURR_POS END_POS_VALVES_NEXT_POS = START_POS_VALVES_NEXT_POS + 2 + START_POS_A = END_POS_VALVES_NEXT_POS + END_POS_A = START_POS_A + 2 + START_POS_B = END_POS_A + END_POS_B = START_POS_B + 2 + START_POS_C = END_POS_B + END_POS_C = START_POS_C + 2 + START_POS_D = END_POS_C + END_POS_D = START_POS_D + 2 + START_MAX_HOMING_ENC = END_POS_D + END_MAX_HOMING_ENC = START_MAX_HOMING_ENC + 2 def __init__(self, can_interface, logger: Logger): """ @@ -86,16 +71,17 @@ if self.can_interface is not None: channel_id = DenaliChannels.td_sync_broadcast_ch_id - msg_id = MsgIds.MSG_ID_TD_VALVES_DATA.value - self.can_interface.register_receiving_publication_function(channel_id, msg_id, + self.msg_id_td_valves_data = MsgIds.MSG_ID_TD_VALVES_DATA.value + self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_valves_data, self._handler_valves_sync) self.td_valves_timestamp = 0.0 # A dictionary of the valves with the status - self.valves_status = {ValvesEnum.H1_VALV.name: {}, - ValvesEnum.H19_VALV.name: {}} + self.valves_status = {td_enum_repository.TDValveNames.H1_VALV.name: {}, + td_enum_repository.TDValveNames.H19_VALV.name: {}} - @publish(["td_valves_timestamp","valves_status"]) + + @publish(["msg_id_td_valves_data", "valves_status", "td_valves_timestamp"]) def _handler_valves_sync(self, message: dict, timestamp=0.0) -> None: """ Handles published valves data messages. TD valves data are captured @@ -112,17 +98,26 @@ message['message'][self.START_POS_VALVES_CURR_POS_ID:self.END_POS_VALVES_CURR_POS_ID]))[0] pos_cnt = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_CURR_POS:self.END_POS_VALVES_CURR_POS]))[0] - next_pos = struct.unpack('h', bytearray( + cmd_pos = struct.unpack('h', bytearray( message['message'][self.START_POS_VALVES_NEXT_POS:self.END_POS_VALVES_NEXT_POS]))[0] + pos_a = struct.unpack('h', bytearray(message['message'][self.START_POS_A:self.END_POS_A]))[0] + pos_b = struct.unpack('h', bytearray(message['message'][self.START_POS_B:self.END_POS_B]))[0] + pos_c = struct.unpack('h', bytearray(message['message'][self.START_POS_C:self.END_POS_C]))[0] + pos_d = struct.unpack('h', bytearray(message['message'][self.START_POS_D:self.END_POS_D]))[0] + max_homing_enc = struct.unpack('h', bytearray( + message['message'][self.START_MAX_HOMING_ENC:self.END_MAX_HOMING_ENC]))[0] + # To make sure values of the enums are not out of range - if ValvesEnum.has_value(vlv_id) and ValvesPositions.has_value(pos_id) and ValvesStates.has_value(pos_id): - vlv_name = ValvesEnum(vlv_id).name + if td_enum_repository.TDValveNames.has_value(vlv_id) and td_enum_repository.TDValvePositions.has_value(pos_id) and td_enum_repository.TDValveStates.has_value(pos_id): + vlv_name = td_enum_repository.TDValveNames(vlv_id).name # Update the valves dictionary - self.valves_status[vlv_name] = {'Valve': vlv_name, 'PosID': ValvesPositions(pos_id).name, 'PosCnt': pos_cnt, - 'Cmd': next_pos, 'State': ValvesStates(state_id).name} + self.valves_status[vlv_name] = {'Valve': vlv_name, 'PosID': td_enum_repository.TDValvePositions(pos_id).name, 'PosCnt': pos_cnt, + 'Cmd': cmd_pos, 'State': td_enum_repository.TDValveStates(state_id).name, 'PosA': pos_a, + 'PosB': pos_b, 'PosC': pos_c, 'PosD': pos_d, 'Max_homing_enc': max_homing_enc} self.td_valves_timestamp = timestamp + def cmd_valves_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends broadcast time interval @@ -134,85 +129,72 @@ @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ - if not check_broadcast_interval_override_ms(ms): - return False + return cmd_generic_broadcast_interval_override( + ms = ms, + reset = reset, + channel_id = DenaliChannels.dialin_to_td_ch_id, + msg_id = MsgIds.MSG_ID_TD_VALVES_PUBLISH_INTERVAL_OVERRIDE_REQUEST, + module_name = 'TD Valves', + logger = self.logger, + can_interface = self.can_interface) - reset_value = integer_to_bytearray(reset) - interval_value = integer_to_bytearray(ms) - payload = reset_value + interval_value - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, - message_id=MsgIds.MSG_ID_TD_VALVES_PUBLISH_INTERVAL_OVERRIDE_REQUEST.value, - payload=payload) - - self.logger.debug("Sending {} ms publish interval to the HD valves module".format(ms)) - # Send message - received_message = self.can_interface.send(message) - - # If there is content in message - if received_message is not None: - # Response payload is OK or not - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False - - def cmd_set_valve_position(self, valve: int, position: int, reset: int = NO_RESET) -> int: + def cmd_set_valve_position(self, valve: int, position: int) -> int: """ Constructs and sends the TD valves set position for a valve - @param valve: integer - Valve number. Defined in ValvesEnum class - @param position: integer - Position number: Defined in ValvesPositions class - @param reset: integer - 1 to reset a previous override, 0 to override + @param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class + @param position: integer - Position number: Defined in td_enum_repository.TDValvePositions class @returns 1 if successful, zero otherwise """ - reset_value = integer_to_bytearray(reset) + vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position) - payload = reset_value + pos + vlv + payload = vlv + pos - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, - message_id=MsgIds.MSG_ID_TD_PINCH_VALVE_SET_POSITION_REQUEST.value, - payload=payload) - # Send message - received_message = self.can_interface.send(message) + valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] + return cmd_generic_override( + payload = payload, + reset = NO_RESET, + channel_id = DenaliChannels.dialin_to_td_ch_id, + msg_id = MsgIds.MSG_ID_TD_PINCH_VALVE_SET_POSITION_REQUEST, + entity_name = f'TD {valve_name} Valve position', + override_text = str(position), + logger = self.logger, + can_interface = self.can_interface) - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("TD cmd_valve_override Timeout!!!") - return False - def cmd_home_valve(self, valve: int) -> int: + def cmd_home_valve(self, valve: int, force_home: int, cartridge: int) -> int: """ Constructs and sends the TD valves home command - @param valve: integer - Valve number. Defined in ValvesEnum class + @param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class + @param force_home: integer - 1 to force valve to home position, 0 for normal homing + @param cartridge: integer - 1 to use cartridge settings, 0 to use settings for system use without cartridge @returns 1 if successful, zero otherwise """ - payload = integer_to_bytearray(valve) + vlv = integer_to_bytearray(valve) + frc = integer_to_bytearray(force_home) + cart = integer_to_bytearray(cartridge) + payload = vlv + frc + cart - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, - message_id=MsgIds.MSG_ID_TD_PINCH_VALVE_HOME_REQUEST.value, - payload=payload) - # Send message - received_message = self.can_interface.send(message) + valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] + return cmd_generic_override( + payload = payload, + reset = NO_RESET, + channel_id = DenaliChannels.dialin_to_td_ch_id, + msg_id = MsgIds.MSG_ID_TD_PINCH_VALVE_SET_POSITION_REQUEST, + entity_name = f'TD {valve_name} Valve homing', + override_text = 'Forcefully' if force_home == 1 else 'Normally', + logger = self.logger, + can_interface = self.can_interface) - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("TD Homing Valve Timeout!!!") - return False def cmd_valve_encoder_position_override(self, valve: int, position_count: int, reset: int = NO_RESET) -> int: """ Constructs and sends the TD valves set position for a valve - @@param valve: integer - Valve number. Defined in ValvesEnum class + @@param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class @param position_count: integer value @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise @@ -222,30 +204,24 @@ pos = integer_to_bytearray(position_count) payload = reset_value + pos + vlv - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, - message_id=MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_POSITION_OVERRIDE_REQUEST.value, - payload=payload) - # Send message - received_message = self.can_interface.send(message) + valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] + return cmd_generic_override( + payload = payload, + reset = NO_RESET, + channel_id = DenaliChannels.dialin_to_td_ch_id, + msg_id = MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_POSITION_OVERRIDE_REQUEST, + entity_name = f'TD {valve_name} Valve position', + override_text = f'move by {str(position_count)}', + logger = self.logger, + can_interface = self.can_interface) - # If there is content... - if received_message is not None: - self.logger.debug("Setting {} position to {} ".format(str(ValvesEnum(valve).name), position_count)) - - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("TD valve position override Timeout!!!") - return False - def cmd_valve_status_override(self, valve: int, status: int, reset: int = NO_RESET) -> int: """ Constructs and sends the TD valves set position for a valve - @@param valve: integer - Valve number. Defined in ValvesEnum class + @@param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class @param status: integer value - @param status: integer value @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ @@ -254,51 +230,38 @@ sts = integer_to_bytearray(status) payload = reset_value + sts + vlv - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, - message_id=MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_STATUS_OVERRIDE_REQUEST.value, - payload=payload) - # Send message - received_message = self.can_interface.send(message) + valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] + return cmd_generic_override( + payload = payload, + reset = NO_RESET, + channel_id = DenaliChannels.dialin_to_td_ch_id, + msg_id = MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_STATUS_OVERRIDE_REQUEST, + entity_name = f'TD {valve_name} Valve status', + override_text = str(status), + logger = self.logger, + can_interface = self.can_interface) - # If there is content... - if received_message is not None: - self.logger.debug("Setting {} status to {} ".format(str(ValvesEnum(valve).name), status)) - - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("TD valve status override Timeout!!!") - return False - def cmd_valve_modify_encoder_position_by_offset(self, valve: int, counts: int) -> int: """ Constructs and sends a given valve to change position by a given number of encoder counts. - @@param valve: integer - Valve number. Defined in ValvesEnum class + @@param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class @param count: integer value - @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(counts) payload = pos + vlv - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, - message_id=MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_POSITION_OVERRIDE_REQUEST.value, - payload=payload) - # Send message - received_message = self.can_interface.send(message) - - # If there is content... - if received_message is not None: - - self.logger.debug("Setting {} position by {} ".format(str(ValvesEnum(valve).name), counts)) - - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("TD valve position override Timeout!!!") - return False - + valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] + return cmd_generic_override( + payload = payload, + reset = NO_RESET, + channel_id = DenaliChannels.dialin_to_td_ch_id, + msg_id = MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_POSITION_OVERRIDE_REQUEST, + entity_name = f'TD {valve_name} Valve encoder position offset', + override_text = str(counts), + logger = self.logger, + can_interface = self.can_interface)