########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file buttons.py # # @author (last) Zoltan Miskolci # @date (last) 26-May-2026 # @author (original) Zoltan Miskolci # @date (original) 26-May-2026 # ############################################################################ import struct from logging import Logger from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_override from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish from leahi_dialin.utils.conversions import integer_to_bytearray class TDDrySelfTest(AbstractSubSystem): """ Treatment Delivery (TD) Dialin API sub-class for dry self test related commands. """ def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ Dry Self Test constructor @param can_interface: the Denali CAN interface object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_TD_DRY_SELF_TEST_PROGRESS_DATA.value, function = self._handler_progress_data_sync) self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_to_ui_ch_id, message_id = MsgIds.MSG_ID_TD_TUBE_SET_AUTHENTICATION_REQUEST.value, function = self._handler_tubing_set_barcode_authentication_request) self.td_dry_self_test_timestamp = 0.0 #: The timestamp of the last message self.substate = 0 #: The Substate of the Dry Self Test self.timeout = 0 #: The Timeout value of the Dry Self Test self.countdown = 0 #: The Countdown value of the Dry Self Test self.td_barcode_auth_request_sent_timestamp = 0.0 #: The timestamp when the TD requested Barcode Authentication from UI @publish(['msg_id_td_dry_self_test_progress_data', 'substate', 'timeout', 'countdown']) def _handler_progress_data_sync(self, message, timestamp=0.0): """ Tubing Set Authentication response handler @param message: (Bytearray) The occurred message string @param timestamp: (Float) The timestamp of the message @return: None """ msg_list = [] msg_list.append(['self.substate', 'i']) msg_list.append(['self.timeout', 'i']) msg_list.append(['self.countdown', 'i']) i = 1 for sensor in msg_list: start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') value = struct.unpack(sensor[1],bytearray(message['message'][start_pos:end_pos]))[0] if 'nan' in str(value).lower(): value = 0.0 exec(f'{sensor[0]} = {value}') i += 1 self.td_dry_self_test_timestamp = timestamp @publish(['msg_id_tubing_set_authentication_request', 'valid_tubing_set', 'modality_match']) def _handler_tubing_set_barcode_authentication_request(self, message, timestamp=0.0): """ Tubing Set Authentication request to UI handler @param message: (Bytearray) The occurred message string @param timestamp: (Float) The timestamp of the message @return: None """ # Send the response automatically to prevent timeout and system going to fault self.cmd_send_ui_barcode_auth_response() self.td_barcode_auth_request_sent_timestamp = timestamp def cmd_send_ui_barcode_auth_response(self, valid_tubing_set: bool=True, modality_match: bool=True) -> int: """ Constructs and sends the UI Barcode Authentication response @param valid_tubing_set: (Boolean) If the Tubing Set is valid or not @param modality_match: (Boolean) If the Tubing Set's Modality is accepted or not @return: 1 if successful, zero otherwise """ payload = integer_to_bytearray(1 if valid_tubing_set else 0) payload += integer_to_bytearray(1 if modality_match else 0) print(payload) return cmd_generic_override( payload = payload, reset = None, channel_id = DenaliChannels.ui_to_td_ch_id, msg_id = MsgIds.MSG_ID_UI_TUBE_SET_AUTHENTICATION_ACK_RESPONSE, entity_name = f'UI Barcode Authentication Ack response', override_text = f'Valid: {valid_tubing_set}, Accepted: {modality_match}', logger = self.logger, can_interface = self.can_interface)