Index: leahi_dialin/fp/modules/alarms.py =================================================================== diff -u -rf03c9a32180573f2430105ad69eb91d9f597a96b -r34b64ff2d8a64f4b7b60b80bb7cf4c36845e5943 --- leahi_dialin/fp/modules/alarms.py (.../alarms.py) (revision f03c9a32180573f2430105ad69eb91d9f597a96b) +++ leahi_dialin/fp/modules/alarms.py (.../alarms.py) (revision 34b64ff2d8a64f4b7b60b80bb7cf4c36845e5943) @@ -8,19 +8,21 @@ # @file alarms.py # # @author (last) Zoltan Miskolci -# @date (last) 08-Jan-2026 +# @date (last) 05-May-2026 # @author (original) Quang Nguyen # @date (original) 02-Sep-2020 # ############################################################################ -import struct +# Module imports from logging import Logger +import struct +# Project imports from leahi_dialin.common.alarm_defs import AlarmList from leahi_dialin.common.constants import NO_RESET -from leahi_dialin.common import fp_enum_repository -from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions +from leahi_dialin.common.generic_defs import DataTypes +from leahi_dialin.common.msg_defs import MsgIds from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliMessage, DenaliChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem @@ -45,9 +47,9 @@ self.logger = logger if self.can_interface is not None: - channel_id = DenaliChannels.fp_alarm_broadcast_ch_id - self.msg_id_fp_alarm_triggered = MsgIds.MSG_ID_ALARM_TRIGGERED.value - self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_fp_alarm_triggered, self._handler_alarm_triggered) + self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.fp_alarm_broadcast_ch_id, + message_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value, + function = self._handler_alarm_triggered) self.fp_alarm_triggered_timestamp = 0.0 #: The timestamp of the last Alarm Triggered message @@ -62,26 +64,7 @@ self.last_alarm_data_1 = 0.0 #: The last Alarm's data part 1 self.last_alarm_data_2 = 0.0 #: The last Alarm's data part 2 - # alarm information - self.alarm_data_type = dict() #: The Alarm's Data type in dict format - # Loop through the list of the event data type enum and update the dictionary - for data_type in fp_enum_repository.DataTypes: - event_data_type = fp_enum_repository.DataTypes(data_type).name - struct_unpack_type = None - - # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is - # the corresponding format in the python struct - if 'U32' in event_data_type or 'BOOL' in event_data_type or 'NONE' in event_data_type: - struct_unpack_type = 'I' - elif 'S32' in event_data_type: - struct_unpack_type = 'i' - elif 'F32' in event_data_type: - struct_unpack_type = 'f' - - self.alarm_data_type[event_data_type] = struct_unpack_type - - @publish(["msg_id_fp_alarm_triggered", "alarm_states", "alarm_conditions", "alarm_data", "alarm_priorities", "alarm_ranks", "alarm_clear_top_only_flags", "fp_alarm_triggered_timestamp"]) @@ -93,35 +76,51 @@ @return: none """ self.logger.debug("Alarm activated!") - alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) + # Local variables + alarm_id:int + data_typ_1:int + data_1:int + data_typ_2:int + data_2:int + priority:int + rank:int + clr_top_only:int + + msg_list = [] + msg_list.append(('alarm_id', DataTypes.U32)) + msg_list.append(('data_typ_1', DataTypes.U32)) + msg_list.append(('data_1', 'data_typ_1')) + msg_list.append(('data_typ_2', DataTypes.U32)) + msg_list.append(('data_2', 'data_typ_2')) + msg_list.append(('priority', DataTypes.U32)) + msg_list.append(('rank', DataTypes.U32)) + msg_list.append(('clr_top_only', DataTypes.U32)) - data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) - # Get the corresponding structure format - struct_data_type_1 = self.alarm_data_type[fp_enum_repository.DataTypes(data_typ_1[0]).name] - # Get the data value by unpacking the data type - data_1 = struct.unpack(struct_data_type_1, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) + i = 1 + for msg_detail in msg_list: + start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') + end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') + if isinstance(msg_detail[1], DataTypes): + unpack_attrib = msg_detail[1].unpack_attrib() + else: + data_type_val:int + exec(f'data_type_val = {msg_detail[1]}') + unpack_attrib = DataTypes(data_type_val).unpack_attrib() + value = struct.unpack(unpack_attrib,bytearray(message['message'][start_pos:end_pos]))[0] + exec(f'{msg_detail[0]} = {value}') + i += 1 - data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) - # Get the corresponding structure format - struct_data_type_2 = self.alarm_data_type[fp_enum_repository.DataTypes(data_typ_2[0]).name] - # Get the data value by unpacking the data type - data_2 = struct.unpack(struct_data_type_2, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) - - priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) - rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) - clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) - self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) - self.alarm_states[alarm_id[0]] = True - self.alarm_conditions[alarm_id[0]] = True - self.alarm_priorities[alarm_id[0]] = priority[0] - self.alarm_ranks[alarm_id[0]] = rank[0] - self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] - self.alarm_data[alarm_id[0]] = [data_1[0], data_2[0]] - self.last_alarm_triggered = alarm_id[0] - self.last_alarm_data_1 = data_1[0] - self.last_alarm_data_2 = data_2[0] - self.fp_alarm_triggered_timestamp = timestamp + self.alarm_states[alarm_id] = True + self.alarm_conditions[alarm_id] = True + self.alarm_priorities[alarm_id] = priority + self.alarm_ranks[alarm_id] = rank + self.alarm_clear_top_only_flags[alarm_id] = clr_top_only + self.alarm_data[alarm_id] = [data_1, data_2] + self.last_alarm_triggered = alarm_id + self.last_alarm_data_1 = data_1 + self.last_alarm_data_2 = data_2 + self.dd_alarm_triggered_timestamp = timestamp def cmd_alarm_state_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: