########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file valves.py # # @author (last) Zoltan Miskolci # @date (last) 05-May-2026 # @author (original) Dara Navaei # @date (original) 19-Aug-2020 # ############################################################################ # Module imports from logging import Logger # Project imports from leahi_dialin.common.constants import NO_RESET from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.common import td_enum_repository from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish from leahi_dialin.utils.conversions import integer_to_bytearray class TDValves(AbstractSubSystem): """ Treatment Delivery (TD) Dialin API sub-class for valves related commands. """ def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ TDValves constructor @param can_interface: (DenaliCanMessenger) - Denali CAN messenger object. @param logger: (Logger) - Dialin logger """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_TD_VALVES_DATA.value, function = self._handler_valves_sync) self.td_valves_timestamp = 0.0 #: The timestamp of the latest message # The Valves data in dictionary format self.valves_status = {td_enum_repository.TDValveNames.H1_VALV.name: {}, td_enum_repository.TDValveNames.H19_VALV.name: {}} @publish(["msg_id_td_valves_data", "valves_status", "td_valves_timestamp"]) def _handler_valves_sync(self, message: dict, timestamp=0.0) -> None: """ Handles published valves data messages. TD valves data are captured for reference. @param message: published TD valves data message @returns none """ msg_list = [] msg_list.append(('Valve', DataTypes.U32)) msg_list.append(('State', DataTypes.U32)) msg_list.append(('PosID', DataTypes.U32)) msg_list.append(('PosCnt', DataTypes.U16)) msg_list.append(('Cmd', DataTypes.U16)) msg_list.append(('PosA', DataTypes.U16)) msg_list.append(('PosB', DataTypes.U16)) msg_list.append(('PosC', DataTypes.U16)) msg_list.append(('PosD', DataTypes.U16)) msg_list.append(('Max_homing_enc', DataTypes.U16)) result = self.process_into_vars(decoder_list = msg_list, message = message) # To make sure values of the enums are not out of range if td_enum_repository.TDValveNames.has_value(result['Valve']) and td_enum_repository.TDValvePositions.has_value(result['PosID']) and td_enum_repository.TDValveStates.has_value(result['State']): # Updating fields result['Valve'] = td_enum_repository.TDValveNames(result['Valve']).name result['State'] = td_enum_repository.TDValveStates(result['State']).name result['PosID'] = td_enum_repository.TDValvePositions(result['PosID']).name # Update the valves dictionary self.valves_status[result['Valve']] = result self.td_valves_timestamp = timestamp def cmd_valves_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends broadcast time interval Constraints: Must be logged into TD. Given interval must be non-zero and a multiple of the TD general task interval (50 ms). @param ms: Publish time interval in ms @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_VALVES_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name = 'TD Valves', logger = self.logger, can_interface = self.can_interface) def cmd_set_valve_position(self, valve: int, position: int) -> int: """ Constructs and sends the TD valves set position for a valve @param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class @param position: integer - Position number: Defined in td_enum_repository.TDValvePositions class @returns 1 if successful, zero otherwise """ vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position) payload = vlv + pos valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] return cmd_generic_override( payload = payload, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_PINCH_VALVE_SET_POSITION_REQUEST, entity_name = f'TD {valve_name} Valve position', override_text = str(position), logger = self.logger, can_interface = self.can_interface) def cmd_home_valve(self, valve: int, force_home: int, cartridge: int) -> int: """ Constructs and sends the TD valves home command @param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class @param force_home: integer - 1 to force valve to home position, 0 for normal homing @param cartridge: integer - 1 to use cartridge settings, 0 to use settings for system use without cartridge @returns 1 if successful, zero otherwise """ vlv = integer_to_bytearray(valve) frc = integer_to_bytearray(force_home) cart = integer_to_bytearray(cartridge) payload = vlv + frc + cart valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] return cmd_generic_override( payload = payload, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_PINCH_VALVE_HOME_REQUEST, entity_name = f'TD {valve_name} Valve homing', override_text = 'Forcefully' if force_home == 1 else 'Normally', logger = self.logger, can_interface = self.can_interface) def cmd_valve_encoder_position_override(self, valve: int, position_count: int, reset: int = NO_RESET) -> int: """ Constructs and sends the TD valves set position for a valve @@param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class @param position_count: integer value @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(position_count) payload = reset_value + pos + vlv valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] return cmd_generic_override( payload = payload, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_POSITION_OVERRIDE_REQUEST, entity_name = f'TD {valve_name} Valve position', override_text = f'move by {str(position_count)}', logger = self.logger, can_interface = self.can_interface) def cmd_valve_status_override(self, valve: int, status: int, reset: int = NO_RESET) -> int: """ Constructs and sends the TD valves set position for a valve @@param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class @param status: integer value @param reset: integer - 1 to reset a previous override, 0 to override @returns 1 if successful, zero otherwise """ reset_value = integer_to_bytearray(reset) vlv = integer_to_bytearray(valve) sts = integer_to_bytearray(status) payload = reset_value + sts + vlv valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] return cmd_generic_override( payload = payload, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_STATUS_OVERRIDE_REQUEST, entity_name = f'TD {valve_name} Valve status', override_text = str(status), logger = self.logger, can_interface = self.can_interface) def cmd_valve_modify_encoder_position_by_offset(self, valve: int, counts: int) -> int: """ Constructs and sends a given valve to change position by a given number of encoder counts. @@param valve: integer - Valve number. Defined in td_enum_repository.TDValveNames class @param count: integer value @returns 1 if successful, zero otherwise """ vlv = integer_to_bytearray(valve) pos = integer_to_bytearray(counts) payload = pos + vlv valve_name = td_enum_repository.TDValveNames(valve).name.split('_')[0] return cmd_generic_override( payload = payload, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ROTARY_PINCH_VALVE_POSITION_OVERRIDE_REQUEST, entity_name = f'TD {valve_name} Valve encoder position offset', override_text = str(counts), logger = self.logger, can_interface = self.can_interface)