########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Micahel Garthwaite # @date (last) 17-Aug-2023 # @author (original) Quang Nguyen # @date (original) 02-Sep-2020 # ############################################################################ import struct from logging import Logger from .constants import RESET, NO_RESET from leahi_dialin.common.fp_defs import FPEventDataType from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish, DialinEnum from leahi_dialin.utils.checks import check_broadcast_interval_override_ms from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray class FPAlarms(AbstractSubSystem): """ FP interface containing alarm related commands. """ _ALARM_ID_MAX_ALARMS = 500 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.fp_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_triggered) self.fp_alarm_triggered_timestamp = 0.0 # alarm states based on received TD alarm activation and alarm clear messages self.alarm_states = [False] * 500 # alarm condition states based on received TD alarm activation and clear condition messages self.alarm_conditions = [False] * 500 # alarm priorities based on received TD alarm activation messages self.alarm_priorities = [0] * 500 # alarm ranks based on received TD alarm activation messages self.alarm_ranks = [0] * 500 # alarm clear top only flags based on received TD alarm activation messages self.alarm_clear_top_only_flags = [False] * 500 # alarm debug data on alarm triggered message self.alarm_data = [0, 0] * 500 self.last_alarm_triggered = 0 self.last_alarm_data_1 = 0.0 self.last_alarm_data_2 = 0.0 # alarm information self.alarm_data_type = dict() # Loop through the list of the event data type enum and update the dictionary for data_type in FPEventDataType: event_data_type = FPEventDataType(data_type).name struct_unpack_type = None # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is # the corresponding format in the python struct if 'U32' in event_data_type or 'BOOL' in event_data_type or 'NONE' in event_data_type: struct_unpack_type = 'I' elif 'S32' in event_data_type: struct_unpack_type = 'i' elif 'F32' in event_data_type: struct_unpack_type = 'f' self.alarm_data_type[event_data_type] = struct_unpack_type @publish(["fp_alarm_triggered_timestamp", "alarm_states", "alarm_conditions", "alarm_data", "alarm_priorities", "alarm_ranks", "alarm_clear_top_only_flags"]) def _handler_alarm_triggered(self, message, timestamp = 0.0): """ Handles published FP alarm activation messages. @param message: published FP alarm activation message @return: none """ self.logger.debug("Alarm activated!") alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) # Get the corresponding structure format struct_data_type_1 = self.alarm_data_type[FPEventDataType(data_typ_1[0]).name] # Get the data value by unpacking the data type data_1 = struct.unpack(struct_data_type_1, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) # Get the corresponding structure format struct_data_type_2 = self.alarm_data_type[FPEventDataType(data_typ_2[0]).name] # Get the data value by unpacking the data type data_2 = struct.unpack(struct_data_type_2, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) self.alarm_states[alarm_id[0]] = True self.alarm_conditions[alarm_id[0]] = True self.alarm_priorities[alarm_id[0]] = priority[0] self.alarm_ranks[alarm_id[0]] = rank[0] self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] self.alarm_data[alarm_id[0]] = [data_1[0], data_2[0]] self.last_alarm_triggered = alarm_id[0] self.last_alarm_data_1 = data_1[0] self.last_alarm_data_2 = data_2[0] self.fp_alarm_triggered_timestamp = timestamp def cmd_alarm_state_override(self, alarm: int, state: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm state override command Constraints: Must be logged into FP. Given alarm must be valid. If inactivating alarm, given alarm must be recoverable (clearable). @param alarm: integer - ID of alarm to override @param state: integer - 1 for alarm active, 0 for alarm inactive @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sta = integer_to_bytearray(state) alm = integer_to_bytearray(alarm) payload = rst + sta + alm message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_fp_ch_id, message_id=MsgIds.MSG_ID_FP_ALARM_STATE_OVERRIDE_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") self.logger.debug("Alarm " + str(alarm) + " " + str_res + ": " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_clear_all_alarms(self) -> int: """ Constructs and sends the clear all active alarms command. This will clear even non-recoverable alarms. Constraints: Must be logged into FP. @return: 1 if successful, zero otherwise """ key = integer_to_bytearray(-758926171) # 0xD2C3B4A5 payload = key message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_FP_ALARM_CLEAR_ALL_ALARMS_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("All alarms cleared.") # response payload is OK or not OK return 1 == received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] return False #TODO: alarm state override, alarm info pubish override