########################################################################### # # Copyright (c) 2022-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file air_pump.py # # @author (last) Zoltan Miskolci # @date (last) 08-Jan-2026 # @author (original) Micahel Garthwaite # @date (original) 01-Nov-2024 # ############################################################################ import struct from logging import Logger from leahi_dialin.common.constants import NO_RESET from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.common.td_defs import td_enum_repository from leahi_dialin.protocols.CAN import DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish from leahi_dialin.utils.conversions import integer_to_bytearray MAX_AIR_PUMP_POWER_LEVEL = 255 #Maximum air pump power allowed class TDAirPump(AbstractSubSystem): """ TDAirPump Treatment Delivery (TD) Dialin API sub-class for air pump related commands. """ def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.td_sync_broadcast_ch_id self.msg_id_td_air_pump_data = MsgIds.MSG_ID_TD_AIR_PUMP_DATA.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_air_pump_data, self._handler_air_pump_sync) self.td_air_pump = { td_enum_repository.TDAirPumpNames.H12_AIR_PUMP.name: { td_enum_repository.TDAirPumpAttributes.STATE.name: 0, td_enum_repository.TDAirPumpAttributes.DUTY_CYCLE.name: 0, td_enum_repository.TDAirPumpAttributes.RPM.name: 0 } } self.td_air_pump_timestamp = 0.0 @publish(["msg_id_td_air_pump_data", "td_air_pump", "td_air_pump_timestamp"]) def _handler_air_pump_sync(self, message, timestamp=0.0): """ Handles published air pump data messages. @param message: published air pump data message as: air pump state @return: None """ sensor_list =[] sensor_list.append((td_enum_repository.TDAirPumpAttributes.STATE.name, 'i')) sensor_list.append((td_enum_repository.TDAirPumpAttributes.DUTY_CYCLE.name, 'i')) sensor_list.append((td_enum_repository.TDAirPumpAttributes.RPM.name, 'i')) i = 1 for sensor in sensor_list: start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') self.td_air_pump[td_enum_repository.TDAirPumpNames.H12_AIR_PUMP.name][sensor[0]] = struct.unpack(sensor[1], bytearray(message['message'][start_pos:end_pos]))[0] i += 1 self.td_air_pump_timestamp = timestamp def cmd_air_pump_data_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the air pump data broadcast interval override command Constraints: Must be logged into TD. Given interval must be non-zero and a multiple of the TD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_AIR_PUMP_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name = 'TD Air Pump', logger = self.logger, can_interface = self.can_interface) def cmd_air_pump_set_state(self, state: int, dutyCycle: float) -> int: """ Constructs and sends the air pump set state command. Constraints: Must be logged into TD. @param state: integer - 1 to stop the pump, 2 to activate it @param dutyCycle: float - 0-100 value to set the air pump duty cycle @return: 1 if successful, zero otherwise """ if dutyCycle < 0 or dutyCycle > 100: return 0 power = int((dutyCycle/100) * MAX_AIR_PUMP_POWER_LEVEL) sts = integer_to_bytearray(state) pwr = integer_to_bytearray(power) payload = sts + pwr state_name = 'start with power' if state == 2 else 'stopped' power_value = f'{str(power)}' if state == 2 else '' return cmd_generic_override( payload = payload, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_AIR_PUMP_SET_STATE_REQUEST, entity_name = f'TD Air Pump to {state_name}', override_text = f'{power_value}', logger = self.logger, can_interface = self.can_interface)