Index: leahi_dialin/dd/modules/events.py =================================================================== diff -u -r20c821bd230fc7689a0275a2918981669ff5cc19 -r6d104d3185ac3ed7c18c97ecdc13fd59bf53a8d1 --- leahi_dialin/dd/modules/events.py (.../events.py) (revision 20c821bd230fc7689a0275a2918981669ff5cc19) +++ leahi_dialin/dd/modules/events.py (.../events.py) (revision 6d104d3185ac3ed7c18c97ecdc13fd59bf53a8d1) @@ -8,21 +8,24 @@ # @file events.py # # @author (last) Zoltan Miskolci -# @date (last) 07-Jan-2026 +# @date (last) 04-May-2026 # @author (original) Dara Navaei # @date (original) 12-Oct-2021 # ############################################################################ +# Module imports import struct from logging import Logger from datetime import datetime -from time import time +# Project imports from leahi_dialin.common import dd_enum_repository +from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions -from leahi_dialin.protocols.CAN import DenaliChannels -from leahi_dialin.utils.base import AbstractSubSystem, publish +from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliChannels +from leahi_dialin.utils.abstract_classes import AbstractSubSystem +from leahi_dialin.utils.base import publish @@ -33,7 +36,7 @@ """ UNKNOWN_STATE = "UNKNOWN_PREVIOUS_STATE" - def __init__(self, can_interface, logger: Logger): + def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ @param can_interface: Denali CAN Messenger object @@ -43,15 +46,14 @@ self.logger = logger if self.can_interface is not None: - channel_id = DenaliChannels.dd_sync_broadcast_ch_id - self.msg_id_dd_event = MsgIds.MSG_ID_DD_EVENT.value - self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_dd_event, self._handler_events_sync) + self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.dd_sync_broadcast_ch_id, + message_id = MsgIds.MSG_ID_DD_EVENT.value, + function = self._handler_events_sync) + + self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.dd_sync_broadcast_ch_id, + message_id = MsgIds.MSG_ID_DD_OP_MODE_DATA.value, + function = self._handler_dd_op_mode_sync) - channel_id = DenaliChannels.dd_sync_broadcast_ch_id - self.msg_id_dd_op_mode_data = MsgIds.MSG_ID_DD_OP_MODE_DATA.value - self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_dd_op_mode_data, - self._handler_dd_op_mode_sync) - self.dd_events_timestamp = 0.0 #: The timestamp of the last Event message self.dd_event_op_mode = 0 #: The new Operation Mode value self.dd_event_sub_mode = 0 #: The new Operation Sub-Mode value @@ -72,30 +74,13 @@ # Define the dictionaries self._dd_event_dictionary = dict() - self._dd_event_data_type = dict() # Loop through the list of the DD events enums and initial the event dictionary. Each event is a key in the # dictionary and the value is a list. for event in dd_enum_repository.DDEventList: self._dd_event_dictionary[dd_enum_repository.DDEventList(event).name] = [] - # Loop through the list of the event data type enum and update the dictionary - for data_type in dd_enum_repository.DDEventDataTypes: - event_data_type = dd_enum_repository.DDEventDataTypes(data_type).name - struct_unpack_type = None - # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is - # the corresponding format in the python struct - if 'U32' in event_data_type or 'BOOL' in event_data_type: - struct_unpack_type = 'I' - elif 'S32' in event_data_type: - struct_unpack_type = 'i' - elif 'F32' in event_data_type: - struct_unpack_type = 'f' - - self._dd_event_data_type[event_data_type] = struct_unpack_type - - def get_dd_nth_event(self, event_id, event_number=0): """ Returns the nth requested DD event @@ -170,55 +155,55 @@ sub_state = 0 current_sub_tuple = [] - event_id = struct.unpack('i', bytearray( + event_id = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] + # Convert the event ID to enum + event_enum = dd_enum_repository.DDEventList(event_id) - if event_id == dd_enum_repository.DDEventList.DD_EVENT_OPERATION_STATUS.value: + if event_enum is dd_enum_repository.DDEventList.DD_EVENT_OPERATION_STATUS: # Get the data type - event_data_type_1 = struct.unpack('i', bytearray( + event_data_type_1 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] - struct_data_type = self._dd_event_data_type[dd_enum_repository.DDEventDataTypes(event_data_type_1).name] - op_mode = struct.unpack(' current_sub_mode_timestamp: + elif current_timestamp > current_sub_mode_timestamp: # If the previous and current of the last two tuples do not match, then an operation mode transition # has occurred and the previous state is converted from the previous class and the current op mode # is converted from current operation states enum class. @@ -285,13 +268,13 @@ previous_sub_mode_enum_class = self._dd_op_mode_2_sub_mode[previous_sub_mode] event_data_1 = previous_sub_mode_enum_class(event_data_1).name event_data_2 = current_sub_mode_enum_class(event_data_2).name - event_tuple = (datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S.%f'), event_state_name, event_data_1, event_data_2) + event_tuple = (current_timestamp, event_enum.name, event_data_1, event_data_2) - elif event_state_name == dd_enum_repository.DDEventList.DD_EVENT_OPERATION_STATUS.name: - event_tuple = (time(), op_mode, sub_mode, sub_state) + elif event_enum is dd_enum_repository.DDEventList.DD_EVENT_OPERATION_STATUS: + event_tuple = (current_timestamp, op_mode, sub_mode, sub_state) # Update event dictionary - self._dd_event_dictionary[event_state_name].append(event_tuple) + self._dd_event_dictionary[event_enum.name].append(event_tuple) self.dd_events_timestamp = timestamp @@ -304,11 +287,11 @@ @param message: published DD operation mode broadcast message @return: None """ - mode = struct.unpack('i', bytearray( - message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) - smode = struct.unpack('i', bytearray( - message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) + msg_list = [] + msg_list.append(('self.dd_event_op_mode', DataTypes.U32)) + msg_list.append(('self.dd_event_sub_mode', DataTypes.U32)) - self.dd_event_op_mode = mode[0] - self.dd_event_sub_mode = smode[0] + self.process_into_vars(decoder_list = msg_list, + message = message) + self.dd_event_op_mode_timestamp = timestamp