########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Zoltan Miskolci # @date (last) 04-May-2026 # @author (original) Quang Nguyen # @date (original) 02-Sep-2020 # ############################################################################ # Module imports from logging import Logger import struct # Project imports from leahi_dialin.common.alarm_defs import AlarmList from leahi_dialin.common.constants import NO_RESET from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliMessage, DenaliChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish from leahi_dialin.utils.conversions import integer_to_bytearray class DDAlarms(AbstractSubSystem): """ DD interface containing alarm related commands. """ _ALARM_ID_MAX_ALARMS = 500 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.dd_alarm_broadcast_ch_id, message_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value, function = self._handler_alarm_triggered) self.alarm_safety_shutdown_status = 0.0 #: The Alarm's Safety Shutdown status value self.dd_alarm_info_timestamp = 0.0 #: The timestamp of the last Alarm Information message self.dd_alarm_triggered_timestamp = 0.0 #: The timestamp of the last Alarm Triggered message self.dd_alarm_cleared_timestamp = 0.0 #: The timestamp of the last Alarm Cleared message self.dd_alarm_condition_cleared_timestamp = 0.0 #: The timestamp of the last Alarm Condition Cleared message self.alarm_states = [False] * 500 #: The States of all Alarms data in list format self.alarm_conditions = [False] * 500 #: The Conditions of all Alarms data in list format self.alarm_priorities = [0] * 500 #: The Priorities of all Alarms data in list format self.alarm_ranks = [0] * 500 #: The Ranks of all Alarms data in list format self.alarm_clear_top_only_flags = [False] * 500 #: The Clear Top only flags of all Alarms data in list format self.alarm_data = [0, 0] * 500 #: The all Alarms data in list format self.last_alarm_triggered = 0 #: The timestamp when did the last alarm triggered self.last_alarm_data_1 = 0.0 #: The last Alarm's data part 1 self.last_alarm_data_2 = 0.0 #: The last Alarm's data part 2 @publish(["msg_id_dd_alarm_triggered", "alarm_states", "alarm_conditions", "alarm_data", "alarm_priorities", "alarm_ranks", "alarm_clear_top_only_flags", "dd_alarm_triggered_timestamp"]) def _handler_alarm_triggered(self, message, timestamp = 0.0): """ Handles published DD alarm activation messages. @param message: published DD alarm activation message @return: none """ # Local variables alarm_id:int data_typ_1:int data_1:int data_typ_2:int data_2:int priority:int rank:int clr_top_only:int msg_list = [] msg_list.append(('alarm_id', DataTypes.U32)) msg_list.append(('data_typ_1', DataTypes.U32)) msg_list.append(('data_1', 'data_typ_1')) msg_list.append(('data_typ_2', DataTypes.U32)) msg_list.append(('data_2', 'data_typ_2')) msg_list.append(('priority', DataTypes.U32)) msg_list.append(('rank', DataTypes.U32)) msg_list.append(('clr_top_only', DataTypes.U32)) i = 1 for msg_detail in msg_list: start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') if isinstance(msg_detail[1], DataTypes): unpack_attrib = msg_detail[1].unpack_attrib() else: data_type_val:int exec(f'data_type_val = {msg_detail[1]}') unpack_attrib = DataTypes(data_type_val).unpack_attrib() value = struct.unpack(unpack_attrib,bytearray(message['message'][start_pos:end_pos]))[0] exec(f'{msg_detail[0]} = {value}') i += 1 self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) self.alarm_states[alarm_id] = True self.alarm_conditions[alarm_id] = True self.alarm_priorities[alarm_id] = priority self.alarm_ranks[alarm_id] = rank self.alarm_clear_top_only_flags[alarm_id] = clr_top_only self.alarm_data[alarm_id] = [data_1, data_2] self.last_alarm_triggered = alarm_id self.last_alarm_data_1 = data_1 self.last_alarm_data_2 = data_2 self.dd_alarm_triggered_timestamp = timestamp @publish(["msg_id_dd_alarm_info", "alarm_safety_shutdown_status", "dd_alarm_info_timestamp"]) def _handler_alarm_info(self, message, timestamp = 0.0): """ Handles published DD alarm info messages. @param message: published DD alarm info message @return: none """ self.alarm_safety_shutdown_status = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) self.dd_alarm_info_timestamp = timestamp @publish(["msg_id_dd_alarm_cleared", "dd_alarm_cleared_timestamp"]) def _handler_alarm_cleared(self, message, timestamp = 0.0): """ Handles published DD alarm cleared messages. @param message: published DD alarm cleared message @return: none """ #ToDo self.dd_alarm_cleared_timestamp = timestamp @publish(["msg_id_dd_alarm_condition_cleared", "dd_alarm_condition_cleared_timestamp"]) def _handler_alarm_condition_cleared(self, message, timestamp = 0.0): """ Handles published DD alarm condition cleared messages. @param message: published DD alarm condition cleared message @return: none """ #ToDo self.dd_alarm_condition_cleared_timestamp = timestamp def cmd_alarm_state_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm state data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ print('Not implemented in the FW') return False return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm state', logger = self.logger, can_interface = self.can_interface) def cmd_alarm_info_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm info data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ print('Not implemented in the FW') return False return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm info', logger = self.logger, can_interface = self.can_interface) def cmd_alarm_cleared_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm cleared data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ print('Not implemented in the FW') return False return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm cleared', logger = self.logger, can_interface = self.can_interface) def cmd_alarm_condition_cleared_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm condition cleared data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ print('Not implemented in the FW') return False return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm condition cleared', logger = self.logger, can_interface = self.can_interface) def cmd_alarm_state_override(self, alarm: int, state: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm state override command Constraints: Must be logged into DD. Given alarm must be valid. If inactivating alarm, given alarm must be recoverable (clearable). @param alarm: integer - ID of alarm to override @param state: integer - 1 for alarm active, 0 for alarm inactive @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sta = integer_to_bytearray(state) alm = integer_to_bytearray(alarm) payload = rst + sta + alm alarm_name = AlarmList(alarm).name state_name = 'Active' if state != 0 else 'Inactive' return cmd_generic_override( payload = payload, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_ALARM_STATE_OVERRIDE_REQUEST, entity_name = f'DD {alarm_name} Alarm state', override_text = f'{state_name}', logger = self.logger, can_interface = self.can_interface)