########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Zoltan Miskolci # @date (last) 08-Jan-2026 # @author (original) Quang Nguyen # @date (original) 02-Sep-2020 # ############################################################################ import struct from logging import Logger from leahi_dialin.common.alarm_defs import AlarmList from leahi_dialin.common.constants import NO_RESET from leahi_dialin.common.dd_defs import dd_enum_repository from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish from leahi_dialin.utils.conversions import integer_to_bytearray class DDAlarms(AbstractSubSystem): """ DD interface containing alarm related commands. """ _ALARM_ID_MAX_ALARMS = 500 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dd_alarm_broadcast_ch_id self.msg_id_dd_alarm_triggered = MsgIds.MSG_ID_ALARM_TRIGGERED.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_dd_alarm_triggered, self._handler_alarm_triggered) self.alarm_safety_shutdown_status = 0.0 self.dd_alarm_info_timestamp = 0.0 self.dd_alarm_triggered_timestamp = 0.0 self.dd_alarm_cleared_timestamp = 0.0 self.dd_alarm_condition_cleared_timestamp = 0.0 # alarm states based on received TD alarm activation and alarm clear messages self.alarm_states = [False] * 500 # alarm condition states based on received TD alarm activation and clear condition messages self.alarm_conditions = [False] * 500 # alarm priorities based on received TD alarm activation messages self.alarm_priorities = [0] * 500 # alarm ranks based on received TD alarm activation messages self.alarm_ranks = [0] * 500 # alarm clear top only flags based on received TD alarm activation messages self.alarm_clear_top_only_flags = [False] * 500 # alarm debug data on alarm triggered message self.alarm_data = [0, 0] * 500 self.last_alarm_triggered = 0 self.last_alarm_data_1 = 0.0 self.last_alarm_data_2 = 0.0 # alarm information self.alarm_data_type = dict() # Loop through the list of the event data type enum and update the dictionary for data_type in dd_enum_repository.DDEventDataTypes: event_data_type = dd_enum_repository.DDEventDataTypes(data_type).name struct_unpack_type = None # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is # the corresponding format in the python struct if 'U32' in event_data_type or 'BOOL' in event_data_type or 'NONE' in event_data_type: struct_unpack_type = 'I' elif 'S32' in event_data_type: struct_unpack_type = 'i' elif 'F32' in event_data_type: struct_unpack_type = 'f' self.alarm_data_type[event_data_type] = struct_unpack_type @publish(["msg_id_dd_alarm_triggered", "alarm_states", "alarm_conditions", "alarm_data", "alarm_priorities", "alarm_ranks", "alarm_clear_top_only_flags", "dd_alarm_triggered_timestamp"]) def _handler_alarm_triggered(self, message, timestamp = 0.0): """ Handles published DD alarm activation messages. @param message: published DD alarm activation message @return: none """ self.logger.debug("Alarm activated!") alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) # Get the corresponding structure format struct_data_type_1 = self.alarm_data_type[dd_enum_repository.DDEventDataTypes(data_typ_1[0]).name] # Get the data value by unpacking the data type data_1 = struct.unpack(struct_data_type_1, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) # Get the corresponding structure format struct_data_type_2 = self.alarm_data_type[dd_enum_repository.DDEventDataTypes(data_typ_2[0]).name] # Get the data value by unpacking the data type data_2 = struct.unpack(struct_data_type_2, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) self.alarm_states[alarm_id[0]] = True self.alarm_conditions[alarm_id[0]] = True self.alarm_priorities[alarm_id[0]] = priority[0] self.alarm_ranks[alarm_id[0]] = rank[0] self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] self.alarm_data[alarm_id[0]] = [data_1[0], data_2[0]] self.last_alarm_triggered = alarm_id[0] self.last_alarm_data_1 = data_1[0] self.last_alarm_data_2 = data_2[0] self.dd_alarm_triggered_timestamp = timestamp @publish(["msg_id_dd_alarm_info", "safetyShutdownStatus", "dd_alarm_info_timestamp"]) def _handler_alarm_info(self, message, timestamp = 0.0): """ Handles published DD alarm info messages. @param message: published DD alarm info message @return: none """ self.alarm_safety_shutdown_status = struct.unpack('I', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) self.dd_alarm_info_timestamp = timestamp @publish(["msg_id_dd_alarm_cleared", "dd_alarm_cleared_timestamp"]) def _handler_alarm_cleared(self, message, timestamp = 0.0): """ Handles published DD alarm cleared messages. @param message: published DD alarm cleared message @return: none """ #ToDo self.dd_alarm_cleared_timestamp = timestamp @publish(["msg_id_dd_alarm_condition_cleared", "dd_alarm_condition_cleared_timestamp"]) def _handler_alarm_condition_cleared(self, message, timestamp = 0.0): """ Handles published DD alarm condition cleared messages. @param message: published DD alarm condition cleared message @return: none """ #ToDo self.dd_alarm_condition_cleared_timestamp = timestamp def cmd_alarm_state_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm state data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ print('Not implemented in the FW') return False return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm state', logger = self.logger, can_interface = self.can_interface) def cmd_alarm_info_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm info data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ print('Not implemented in the FW') return False return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm info', logger = self.logger, can_interface = self.can_interface) def cmd_alarm_cleared_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm cleared data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ print('Not implemented in the FW') return False return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm cleared', logger = self.logger, can_interface = self.can_interface) def cmd_alarm_condition_cleared_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm condition cleared data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ print('Not implemented in the FW') return False return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm condition cleared', logger = self.logger, can_interface = self.can_interface) def cmd_alarm_state_override(self, alarm: int, state: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm state override command Constraints: Must be logged into DD. Given alarm must be valid. If inactivating alarm, given alarm must be recoverable (clearable). @param alarm: integer - ID of alarm to override @param state: integer - 1 for alarm active, 0 for alarm inactive @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sta = integer_to_bytearray(state) alm = integer_to_bytearray(alarm) payload = rst + sta + alm alarm_name = AlarmList(alarm).name state_name = 'Active' if state != 0 else 'Inactive' return cmd_generic_override( payload = payload, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_ALARM_STATE_OVERRIDE_REQUEST, entity_name = f'DD {alarm_name} Alarm state', override_text = f'{state_name}', logger = self.logger, can_interface = self.can_interface)