########################################################################### # # Copyright (c) 2022-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file air_pump.py # # @author (last) Zoltan Miskolci # @date (last) 08-Jan-2026 # @author (original) Micahel Garthwaite # @date (original) 01-Nov-2024 # ############################################################################ import struct from logging import Logger from leahi_dialin.common.constants import NO_RESET from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.common.td_defs import TDAirPumpNames, TDAirPumpAttributes from leahi_dialin.protocols.CAN import DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish from leahi_dialin.utils.conversions import integer_to_bytearray class TDAirPump(AbstractSubSystem): """ TDAirPump Treatment Delivery (TD) Dialin API sub-class for air pump related commands. """ def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.td_sync_broadcast_ch_id self.msg_id_td_air_pump_data = MsgIds.MSG_ID_TD_AIR_PUMP_DATA.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_air_pump_data, self._handler_air_pump_sync) self.td_air_pump = { TDAirPumpNames.H12_AIR_PUMP.name: { TDAirPumpAttributes.STATE.name: 0, TDAirPumpAttributes.POWER.name: 0 } } self.td_air_pump_timestamp = 0.0 @publish(["msg_id_td_air_pump_data", "td_air_pump", "td_air_pump_timestamp"]) def _handler_air_pump_sync(self, message, timestamp=0.0): """ Handles published air pump data messages. @param message: published air pump data message as: air pump state @return: None """ aps = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) app = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) self.td_air_pump[TDAirPumpNames.H12_AIR_PUMP.name][TDAirPumpAttributes.STATE.name] = aps[0] self.td_air_pump[TDAirPumpNames.H12_AIR_PUMP.name][TDAirPumpAttributes.POWER.name] = app[0] self.td_air_pump_timestamp = timestamp def cmd_air_pump_data_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the air pump data broadcast interval override command Constraints: Must be logged into TD. Given interval must be non-zero and a multiple of the TD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_AIR_PUMP_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name = 'TD Air Pump', logger = self.logger, can_interface = self.can_interface) def cmd_air_pump_set_state(self, state: int, power: int) -> int: """ Constructs and sends the air pump set state command. Constraints: Must be logged into TD. @param state: integer - 1 to stop the pump, 2 to activate it @param power: integer - 0-255 value to set the air pump power @return: 1 if successful, zero otherwise """ sts = integer_to_bytearray(state) pwr = integer_to_bytearray(power) payload = sts + pwr state_name = 'start with power' if state == 2 else 'stopped' power_value = f'{str(power)}' if state == 2 else '' return cmd_generic_override( payload = payload, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_AIR_PUMP_SET_STATE_REQUEST, entity_name = f'TD Air Pump to {state_name}', override_text = f'{power_value}', logger = self.logger, can_interface = self.can_interface)