########################################################################### # # Copyright (c) 2021-2025 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file events.py # # @author (last) Zoltan Miskolci # @date (last) 05-May-2026 # @author (original) Dara Navaei # @date (original) 12-Oct-2021 # ############################################################################ import struct from logging import Logger from datetime import datetime from time import time from leahi_dialin.common import dd_enum_repository, fp_enum_repository from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliChannels, DenaliCanMessenger from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish class FPEvents(AbstractSubSystem): """ FP Dialin API sub-class for events related commands. """ UNKNOWN_STATE = "UNKNOWN_PREVIOUS_STATE" def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ @param can_interface: Denali CAN Messenger object """ super().__init__() self.can_interface: DenaliCanMessenger = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.fp_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_FP_EVENT.value, function = self._handler_events_sync) self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.fp_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_FP_OP_MODE_DATA.value, function = self._handler_fp_op_mode_sync) self.events_timestamp = 0.0 #: The timestamp of the last Event message self.op_mode = 0 #: The new Operation Mode value self.sub_mode = 0 #: The new Operation Sub-Mode value self.event_op_mode_timestamp = 0.0 #: The timestamp of the last Operation Mode change message # Dictionary of the mode as key and the sub mode states enum class as the value self._op_mode_2_sub_mode = {fp_enum_repository.FPOpModes.MODE_FAUL.name: fp_enum_repository.FPFaultStates, fp_enum_repository.FPOpModes.MODE_SERV.name: fp_enum_repository.FPServiceStates, fp_enum_repository.FPOpModes.MODE_INIT.name: fp_enum_repository.FPPostStates, fp_enum_repository.FPOpModes.MODE_STAN.name: fp_enum_repository.FPStandbyStates, fp_enum_repository.FPOpModes.MODE_PRE_GENP.name: fp_enum_repository.FPPreGenPermeateStates, fp_enum_repository.FPOpModes.MODE_GENP.name: fp_enum_repository.FPGenPermeateStates, fp_enum_repository.FPOpModes.MODE_DPGP.name: fp_enum_repository.FPPreGenPDefStates, fp_enum_repository.FPOpModes.MODE_DEGP.name: fp_enum_repository.FPGenPermeateDefStates, fp_enum_repository.FPOpModes.MODE_NLEG.name: fp_enum_repository.FPNotLegalStates} # Loop through the list of the FP events enums and initial the event dictionary. Each event is a key in the # dictionary and the value is a list. self._event_dictionary = dict() for event in fp_enum_repository.FPEventList: self._event_dictionary[fp_enum_repository.FPEventList(event).name] = [] def get_fp_nth_event(self, event_id, event_number=0): """ Returns the nth requested FP event @param event_id the ID of the FP event types (i.e. FP_EVENT_STARTUP) @param event_number the event number that is requested. The default is 0 meaning the last occurred event @returns the requested FP event number """ list_length = len(self._event_dictionary[fp_enum_repository.FPEventList(event_id).name]) if list_length == 0: event = [] elif event_number > list_length: event = self._event_dictionary[fp_enum_repository.FPEventList(event_id).name][list_length - 1] else: event = self._event_dictionary[fp_enum_repository.FPEventList(event_id).name][list_length - event_number - 1] return event def clear_fp_event_list(self): """ Clears the FP event list @returns none """ for key in self._event_dictionary: self._event_dictionary[key].clear() def get_fp_events(self, event_id, number_of_events=1): """ Returns the requested number of a certain FP event ID @param event_id the ID of the FP event types (i.e. FP_EVENT_STARTUP) @param number_of_events the last number of messages of a certain event type @returns a list of the requested FP event type """ list_of_events = [] # If there are not enough event lists send all the events that are available if len(self._event_dictionary[fp_enum_repository.FPEventList(event_id).name]) <= number_of_events: list_of_events = self._event_dictionary[fp_enum_repository.FPEventList(event_id).name] else: # Get the all the events complete_list = self._event_dictionary[fp_enum_repository.FPEventList(event_id).name] # Since the last are located at the end of the list, iterate backwards for the defined # event messages for i in range(len(complete_list) - 1, len(complete_list) - number_of_events - 1, -1): list_of_events.append(complete_list[i]) if number_of_events == 0: list_of_events = self._event_dictionary[fp_enum_repository.FPEventList(event_id).name] return list_of_events @publish(["msg_id_fp_event", "events_timestamp", '_event_dictionary']) def _handler_events_sync(self, message, timestamp=0.0): """ Handles published events message @param message: published FP events data message @returns none """ event_data_1 = 0 event_data_2 = 0 op_mode = 0 sub_mode = 0 sub_state = 0 event_id = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] # Convert the event ID to enum compensated_event_id = event_id - dd_enum_repository.DDEventList.NUM_OF_DD_EVENT_IDS.value event_enum = fp_enum_repository.FPEventList(compensated_event_id) current_timestamp = datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S.%f') if event_enum is fp_enum_repository.FPEventList.FP_EVENT_OPERATION_STATUS: # Get the data type - irrelevant event_data_type_1 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] # For debug reasons get the full second byte event_data_1 = struct.unpack(DataTypes(event_data_type_1).unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] # Opmode data is compressed into the 4 byte op_mode = struct.unpack(DataTypes.U08.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.START_POS_FIELD_3+1]))[0] sub_mode = struct.unpack(DataTypes.U08.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3+1:MsgFieldPositions.START_POS_FIELD_3+2]))[0] sub_state = struct.unpack(DataTypes.U08.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3+2:MsgFieldPositions.START_POS_FIELD_3+3]))[0] _ = struct.unpack(DataTypes.U08.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4+3:MsgFieldPositions.END_POS_FIELD_4]))[0] event_data_type_2 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] event_data_2 = struct.unpack(DataTypes(event_data_type_2).unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] else: # Get the data type event_data_type_1 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] # Get the data value by unpacking the data type event_data_1 = struct.unpack(DataTypes(event_data_type_1).unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] event_data_type_2 = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] event_data_2 = struct.unpack(DataTypes(event_data_type_2).unpack_attrib(), bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] # Get the current timestamp and create a tuple of the current events event_tuple = (current_timestamp, event_enum.name, event_data_1, event_data_2) # Check if the event state name is operation mode change. If it is, get the name of the operation modes # from the op modes enum class if event_enum is fp_enum_repository.FPEventList.FP_EVENT_OP_MODE_CHANGE: event_data_1 = fp_enum_repository.FPOpModes(event_data_1).name event_data_2 = fp_enum_repository.FPOpModes(event_data_2).name event_tuple = (current_timestamp, event_enum.name, event_data_1, event_data_2) # Check if the event state name is sub mode change. elif event_enum is fp_enum_repository.FPEventList.FP_EVENT_SUB_MODE_CHANGE: # Get the Op Mode Change messages op_modes_list = self._event_dictionary[fp_enum_repository.FPEventList.FP_EVENT_OP_MODE_CHANGE.name] # If there are Op mode change messages, use it to determine the operation mode for the current submode change # Index description: # 0: Timestamp # 1: Name # 2: Previous Op / Sub Mode # 3: Current Op / Sub Mode if len(op_modes_list) != 0: # Get the Sub Mode Change messages sub_mode_list = self._event_dictionary[fp_enum_repository.FPEventList.FP_EVENT_SUB_MODE_CHANGE.name] last_op_mode_message = op_modes_list[-1] # Check if this is not the very first Sub Mode Change message if len(sub_mode_list != 0): # Get the timestamps of the last messages of both list for comparison prev_op_mode_timestamp = datetime.strptime(last_op_mode_message[0], '%Y-%m-%d %H:%M:%S.%f') prev_sub_mode_timestamp = datetime.strptime(sub_mode_list[-1][0], '%Y-%m-%d %H:%M:%S.%f') # In case the Op Mode Change is older then the previous Sub Mode Change, # use the current Op Mode from the previous Op Mode Change message for both data if prev_op_mode_timestamp <= prev_sub_mode_timestamp: event_data_1 = self._op_mode_2_sub_mode[last_op_mode_message[3]] event_data_2 = self._op_mode_2_sub_mode[last_op_mode_message[3]] # In case the Op Mode Change is newer then the previous Sub Mode Change, # use the operation mode informations from the last Op Mode Change message else: event_data_1 = self._op_mode_2_sub_mode[last_op_mode_message[2]] event_data_2 = self._op_mode_2_sub_mode[last_op_mode_message[3]] # In case this is the very first Sub Mode Change message, # use the operation mode informations from the last Op Mode Change message else: event_data_1 = self._op_mode_2_sub_mode[last_op_mode_message[2]] event_data_2 = self._op_mode_2_sub_mode[last_op_mode_message[3]] # In case there are no Op Mode Change messages, use the Operations States messages # to identify the Op Modes for the Sub Modes else: # Get the Operation Status Change messages op_status_list = self._event_dictionary[fp_enum_repository.FPEventList.FP_EVENT_OPERATION_STATUS.name] data_2_found = False event_1_op_mode = self.UNKNOWN_STATE event_2_op_mode = self.UNKNOWN_STATE # Go through the Operation Status Change message list starting from the back # Index description: # 0: Timestamp # 1: Op Mode # 2: Sub Mode # 3: State for i in range(len(op_status_list) - 1, -1, -1): op_status_msg = op_status_list[i] # Look for a match for event_data_2 if not data_2_found: if op_status_msg[2] == event_data_2: event_2_op_mode = fp_enum_repository.FPOpModes(op_status_msg[1]).name data_2_found = True # Look for a match for event_data_1 after event_data_2 is found # Criteria is that the opmode, submode pair can't be the same as the one found for event_data_2 else: if op_status_msg[2] == event_data_1 and \ (op_status_msg[2] != event_data_2 or fp_enum_repository.FPOpModes(op_status_msg[1]).name != event_2_op_mode): event_1_op_mode = fp_enum_repository.FPOpModes(op_status_msg[1]).name # If op mode for event_data_2 found but not found for event_data_1 and run out of operation states # assume it's the start of the unit start up and the going to standby is not logged yet if event_2_op_mode != self.UNKNOWN_STATE and event_1_op_mode == self.UNKNOWN_STATE: event_1_op_mode = fp_enum_repository.FPOpModes.MODE_STAN.name # Update the event_data values event_data_1 = self._op_mode_2_sub_mode[event_1_op_mode] event_data_2 = self._op_mode_2_sub_mode[event_2_op_mode] # Update the tuple event_tuple = (current_timestamp, event_enum.name, event_data_1, event_data_2) elif event_enum is fp_enum_repository.FPEventList.FP_EVENT_OPERATION_STATUS: event_tuple = (current_timestamp, op_mode, sub_mode, sub_state) # Update event dictionary self._event_dictionary[event_enum.name].append(event_tuple) self.events_timestamp = timestamp @publish(["msg_id_fp_op_mode_data", "event_op_mode_timestamp", "op_mode", "sub_mode"]) def _handler_fp_op_mode_sync(self, message, timestamp=0.0): """ Handles published FP operation mode messages. Current FP operation mode is captured for reference. @param message: published FP operation mode broadcast message @return: None """ msg_list = [] msg_list.append(('self.op_mode', DataTypes.U32)) msg_list.append(('self.sub_mode', DataTypes.U32)) self.process_into_vars(decoder_list = msg_list, message = message) self.event_op_mode_timestamp = timestamp