########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Micahel Garthwaite # @date (last) 17-Aug-2023 # @author (original) Quang Nguyen # @date (original) 02-Sep-2020 # ############################################################################ import struct from logging import Logger from .constants import RESET, NO_RESET from leahi_dialin.common.dd_defs import DDEventDataType from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish, DialinEnum from leahi_dialin.utils.checks import check_broadcast_interval_override_ms from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray class DDAlarms(AbstractSubSystem): """ DD interface containing alarm related commands. """ _ALARM_ID_MAX_ALARMS = 500 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.fp_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_triggered) self.dd_alarm_triggered_timestamp = 0.0 # alarm states based on received TD alarm activation and alarm clear messages self.alarm_states = [False] * 500 # alarm condition states based on received TD alarm activation and clear condition messages self.alarm_conditions = [False] * 500 # alarm priorities based on received TD alarm activation messages self.alarm_priorities = [0] * 500 # alarm ranks based on received TD alarm activation messages self.alarm_ranks = [0] * 500 # alarm clear top only flags based on received TD alarm activation messages self.alarm_clear_top_only_flags = [False] * 500 # alarm debug data on alarm triggered message self.alarm_data = [0, 0] * 500 self.last_alarm_triggered = 0 self.last_alarm_data_1 = 0.0 self.last_alarm_data_2 = 0.0 # Loop through the list of the event data type enum and update the dictionary for data_type in DDEventDataType: event_data_type = DDEventDataType(data_type).name struct_unpack_type = None # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is # the corresponding format in the python struct if 'U32' in event_data_type or 'BOOL' in event_data_type or 'NONE' in event_data_type: struct_unpack_type = 'I' elif 'S32' in event_data_type: struct_unpack_type = 'i' elif 'F32' in event_data_type: struct_unpack_type = 'f' self.alarm_data_type[event_data_type] = struct_unpack_type @publish(["dd_alarm_triggered_timestamp", "alarm_states", "alarm_conditions", "alarm_data", "alarm_priorities", "alarm_ranks", "alarm_clear_top_only_flags"]) def _handler_alarm_triggered(self, message, timestamp = 0.0): """ Handles published DD alarm activation messages. @param message: published DD alarm activation message @return: none """ self.logger.debug("Alarm activated!") alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) # Get the corresponding structure format struct_data_type_1 = self.alarm_data_type[DDEventDataType(data_typ_1[0]).name] # Get the data value by unpacking the data type data_1 = struct.unpack(struct_data_type_1, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) # Get the corresponding structure format struct_data_type_2 = self.alarm_data_type[DDEventDataType(data_typ_2[0]).name] # Get the data value by unpacking the data type data_2 = struct.unpack(struct_data_type_2, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) self.alarm_states[alarm_id[0]] = True self.alarm_conditions[alarm_id[0]] = True self.alarm_priorities[alarm_id[0]] = priority[0] self.alarm_ranks[alarm_id[0]] = rank[0] self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] self.alarm_data[alarm_id[0]] = [data_1[0], data_2[0]] self.last_alarm_triggered = alarm_id self.last_alarm_data_1 = data_1[0] self.last_alarm_data_2 = data_2[0] self.dd_alarm_triggered_timestamp = timestamp def _handler_alarm_info(self, message, timestamp = 0.0): pass #TODO: alarm state override, alarm info pubish override