Index: leahi_dialin/td/treatment_delivery.py =================================================================== diff -u -rf03c9a32180573f2430105ad69eb91d9f597a96b -r34b64ff2d8a64f4b7b60b80bb7cf4c36845e5943 --- leahi_dialin/td/treatment_delivery.py (.../treatment_delivery.py) (revision f03c9a32180573f2430105ad69eb91d9f597a96b) +++ leahi_dialin/td/treatment_delivery.py (.../treatment_delivery.py) (revision 34b64ff2d8a64f4b7b60b80bb7cf4c36845e5943) @@ -8,7 +8,7 @@ # @file treatment_delivery.py # # @author (last) Zoltan Miskolci -# @date (last) 08-Jan-2026 +# @date (last) 05-May-2026 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # @@ -39,8 +39,9 @@ from .proxies.ui_proxy import UIProxy from ..common.constants import NO_RESET -from ..common.msg_defs import MsgIds, MsgFieldPositions, MsgFieldPositionsFWVersions +from ..common.msg_defs import MsgIds, MsgFieldPositions from ..common import td_enum_repository +from ..common.generic_defs import DataTypes from ..common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from ..protocols.CAN import DenaliMessage, DenaliCanMessenger, DenaliChannels from ..utils.abstract_classes import AbstractSubSystem @@ -85,25 +86,22 @@ self.callback_id = None # register handler for TD operation mode broadcast messages if self.can_interface is not None: - channel_id = DenaliChannels.td_sync_broadcast_ch_id - self.msg_id_td_op_mode_data = MsgIds.MSG_ID_TD_OP_MODE_DATA.value - self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_op_mode_data, - self._handler_td_op_mode_sync) + self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_sync_broadcast_ch_id, + message_id = MsgIds.MSG_ID_TD_OP_MODE_DATA.value, + function = self._handler_td_op_mode_sync) - self.msg_id_td_debug_event = MsgIds.MSG_ID_TD_DEBUG_EVENT.value - self.can_interface.register_receiving_publication_function(channel_id, - self.msg_id_td_debug_event, - self._handler_td_debug_event_sync) + self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_sync_broadcast_ch_id, + message_id = MsgIds.MSG_ID_TD_DEBUG_EVENT.value, + function = self._handler_td_debug_event_sync) - self.msg_id_td_version_response = MsgIds.MSG_ID_TD_VERSION_RESPONSE.value - self.can_interface.register_receiving_publication_function(channel_id, - self.msg_id_td_version_response, - self._handler_td_version_response_sync) + self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_sync_broadcast_ch_id, + message_id = MsgIds.MSG_ID_TD_VERSION_RESPONSE.value, + function = self._handler_td_version_response_sync) - self.msg_id_ui_version_info_response = MsgIds.MSG_ID_UI_VERSION_INFO_RESPONSE.value - self.can_interface.register_receiving_publication_function(DenaliChannels.ui_to_td_ch_id, - self.msg_id_ui_version_info_response, - self._handler_ui_version_response_sync) + self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_sync_broadcast_ch_id, + message_id = MsgIds.MSG_ID_UI_VERSION_INFO_RESPONSE.value, + function = self._handler_ui_version_response_sync) + # Dialin will send a login message during construction. This is for the leahi subsystems to start # publishing CAN data when there is no UI connected as the UI typically does this job. self.cmd_log_in_to_td() @@ -188,13 +186,12 @@ @param message: published TD operation mode broadcast message @return: None """ - mode = struct.unpack('i', bytearray( - message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) - smode = struct.unpack('i', bytearray( - message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) + msg_list = [] + msg_list.append(('self.td_operation_mode', DataTypes.U32)) + msg_list.append(('self.td_operation_sub_mode', DataTypes.U32)) - self.td_operation_mode = mode[0] - self.td_operation_sub_mode = smode[0] + self.process_into_vars(decoder_list = msg_list, + message = message) self.td_op_mode_timestamp = timestamp @@ -207,33 +204,28 @@ @return: None if not successful, the version string if unpacked successfully """ - major = struct.unpack(' 0 for each in [major, minor, micro, build, compatibility]]): - self.td_version = f"v{major[0]}.{minor[0]}.{micro[0]}-{build[0]}.{compatibility[0]}" - self.logger.debug(f"TD VERSION: {self.td_version}") + result = self.process_into_vars(decoder_list = msg_list, + message = message) - if all([len(each) > 0 for each in [fpga_id, fpga_major, fpga_minor, fpga_lab]]): - self.td_fpga_version = f"v{fpga_id[0]}.{fpga_major[0]}.{fpga_minor[0]}-{fpga_lab[0]}" - self.logger.debug(f"TD FPGA VERSION: {self.td_fpga_version}") + if all([len(each) > 0 for each in [result['major'], result['minor'], result['micro'], result['build'], result['compatibility']]]): + self.td_version = f"v{result['major']}.{result['minor']}.{result['micro']}-{result['build']}.{result['compatibility']}" + self.logger.debug(f'TD VERSION: {self.td_version}') + if all([len(each) > 0 for each in [result['fpga_id'], result['fpga_major'], result['fpga_minor'], result['fpga_lab']]]): + self.td_fpga_version = f"v{result['fpga_id']}.{result['fpga_major']}.{result['fpga_minor']}-{result['fpga_lab']}" + self.logger.debug(f'TD FPGA VERSION: {self.td_fpga_version}') + self.td_version_response_timestamp = timestamp @@ -251,28 +243,24 @@ @return: None if not successful, the version string if unpacked successfully """ - major = struct.unpack(' 0 for each in [major, minor, micro, build, compatibility]]): - self.ui_version = f"v{major[0]}.{minor[0]}.{micro[0]}-{build[0]}.{compatibility[0]}" - self.logger.debug(f"UI VERSION: {self.ui_version}") - + if all([len(each) > 0 for each in [result['major'], result['minor'], result['micro'], result['build'], result['compatibility']]]): + self.ui_version = f"v{result['major']}.{result['minor']}.{result['micro']}-{result['build']}.{result['compatibility']}" + self.logger.debug(f'UI VERSION: {self.ui_version}') else: self.ui_version = None self.logger.debug("Failed to retrieve UI Version.") + self.ui_version_info_response_timestamp = timestamp - def cmd_op_mode_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the measured op mode broadcast interval override command