Index: leahi_dialin/dd/modules/alarms.py =================================================================== diff -u -r20c821bd230fc7689a0275a2918981669ff5cc19 -re45b20cdc5d4c5dcff8cef530b173ca94cb2e422 --- leahi_dialin/dd/modules/alarms.py (.../alarms.py) (revision 20c821bd230fc7689a0275a2918981669ff5cc19) +++ leahi_dialin/dd/modules/alarms.py (.../alarms.py) (revision e45b20cdc5d4c5dcff8cef530b173ca94cb2e422) @@ -8,22 +8,26 @@ # @file alarms.py # # @author (last) Zoltan Miskolci -# @date (last) 08-Jan-2026 +# @date (last) 04-May-2026 # @author (original) Quang Nguyen # @date (original) 02-Sep-2020 # ############################################################################ -import struct +# Module imports from logging import Logger +import struct +# Project imports from leahi_dialin.common.alarm_defs import AlarmList from leahi_dialin.common.constants import NO_RESET -from leahi_dialin.common import dd_enum_repository -from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions +from leahi_dialin.common.generic_defs import DataTypes +from leahi_dialin.common.msg_defs import MsgFieldPositions +from leahi_dialin.common.msg_ids import MsgIds from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override -from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels -from leahi_dialin.utils.base import AbstractSubSystem, publish +from leahi_dialin.protocols.CAN import CanMessenger, CanMessage, CanChannels +from leahi_dialin.utils.abstract_classes import AbstractSubSystem +from leahi_dialin.utils.base import publish from leahi_dialin.utils.conversions import integer_to_bytearray @@ -32,21 +36,21 @@ DD interface containing alarm related commands. """ _ALARM_ID_MAX_ALARMS = 500 - START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX + START_POS_ALARM_ID = CanMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 - def __init__(self, can_interface, logger: Logger): + def __init__(self, can_interface: CanMessenger, logger: Logger): """ - @param can_interface: Denali Can Messenger object + @param can_interface: Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: - channel_id = DenaliChannels.dd_alarm_broadcast_ch_id - self.msg_id_dd_alarm_triggered = MsgIds.MSG_ID_ALARM_TRIGGERED.value - self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_dd_alarm_triggered, self._handler_alarm_triggered) + self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_alarm_broadcast_ch_id, + message_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value, + function = self._handler_alarm_triggered) self.alarm_safety_shutdown_status = 0.0 #: The Alarm's Safety Shutdown status value @@ -66,26 +70,7 @@ self.last_alarm_data_1 = 0.0 #: The last Alarm's data part 1 self.last_alarm_data_2 = 0.0 #: The last Alarm's data part 2 - # alarm information - self.alarm_data_type = dict() #: The Alarm's Data type in dict format - # Loop through the list of the event data type enum and update the dictionary - for data_type in dd_enum_repository.DDEventDataTypes: - event_data_type = dd_enum_repository.DDEventDataTypes(data_type).name - struct_unpack_type = None - - # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is - # the corresponding format in the python struct - if 'U32' in event_data_type or 'BOOL' in event_data_type or 'NONE' in event_data_type: - struct_unpack_type = 'I' - elif 'S32' in event_data_type: - struct_unpack_type = 'i' - elif 'F32' in event_data_type: - struct_unpack_type = 'f' - - self.alarm_data_type[event_data_type] = struct_unpack_type - - @publish(["msg_id_dd_alarm_triggered", "alarm_states", "alarm_conditions", "alarm_data", "alarm_priorities", "alarm_ranks", "alarm_clear_top_only_flags", "dd_alarm_triggered_timestamp"]) def _handler_alarm_triggered(self, message, timestamp = 0.0): @@ -95,35 +80,40 @@ @param message: published DD alarm activation message @return: none """ - self.logger.debug("Alarm activated!") - alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) + result = {} + msg_list = [] + msg_list.append(('alarm_id', DataTypes.U32)) + msg_list.append(('data_typ_1', DataTypes.U32)) + msg_list.append(('data_1', 'data_typ_1')) + msg_list.append(('data_typ_2', DataTypes.U32)) + msg_list.append(('data_2', 'data_typ_2')) + msg_list.append(('priority', DataTypes.U32)) + msg_list.append(('rank', DataTypes.U32)) + msg_list.append(('clr_top_only', DataTypes.U32)) - data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) - # Get the corresponding structure format - struct_data_type_1 = self.alarm_data_type[dd_enum_repository.DDEventDataTypes(data_typ_1[0]).name] - # Get the data value by unpacking the data type - data_1 = struct.unpack(struct_data_type_1, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) + i = 1 + for msg_detail in msg_list: + start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') + end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') + if isinstance(msg_detail[1], DataTypes): + unpack_attrib = msg_detail[1].unpack_attrib() + else: + unpack_attrib = DataTypes(result[msg_detail[1]]).unpack_attrib() + + value = struct.unpack(unpack_attrib, bytearray(message['message'][start_pos:end_pos]))[0] + result[msg_detail[0]] = value + i += 1 - data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) - # Get the corresponding structure format - struct_data_type_2 = self.alarm_data_type[dd_enum_repository.DDEventDataTypes(data_typ_2[0]).name] - # Get the data value by unpacking the data type - data_2 = struct.unpack(struct_data_type_2, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) - - priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) - rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) - clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) - - self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) - self.alarm_states[alarm_id[0]] = True - self.alarm_conditions[alarm_id[0]] = True - self.alarm_priorities[alarm_id[0]] = priority[0] - self.alarm_ranks[alarm_id[0]] = rank[0] - self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] - self.alarm_data[alarm_id[0]] = [data_1[0], data_2[0]] - self.last_alarm_triggered = alarm_id[0] - self.last_alarm_data_1 = data_1[0] - self.last_alarm_data_2 = data_2[0] + self.logger.debug("Alarm ID: %d %d %d" % (result['alarm_id'], result['data_1'], result['data_2'])) + self.alarm_states[result['alarm_id']] = True + self.alarm_conditions[result['alarm_id']] = True + self.alarm_priorities[result['alarm_id']] = result['priority'] + self.alarm_ranks[result['alarm_id']] = result['rank'] + self.alarm_clear_top_only_flags[result['alarm_id']] = result['clr_top_only'] + self.alarm_data[result['alarm_id']] = [result['data_1'], result['data_2']] + self.last_alarm_triggered = result['alarm_id'] + self.last_alarm_data_1 = result['data_1'] + self.last_alarm_data_2 = result['data_2'] self.dd_alarm_triggered_timestamp = timestamp @@ -135,7 +125,7 @@ @param message: published DD alarm info message @return: none """ - self.alarm_safety_shutdown_status = struct.unpack('I', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) + self.alarm_safety_shutdown_status = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) self.dd_alarm_info_timestamp = timestamp @@ -181,7 +171,7 @@ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, - channel_id = DenaliChannels.dialin_to_dd_ch_id, + channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm state', logger = self.logger, @@ -205,7 +195,7 @@ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, - channel_id = DenaliChannels.dialin_to_dd_ch_id, + channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm info', logger = self.logger, @@ -229,7 +219,7 @@ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, - channel_id = DenaliChannels.dialin_to_dd_ch_id, + channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm cleared', logger = self.logger, @@ -253,7 +243,7 @@ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, - channel_id = DenaliChannels.dialin_to_dd_ch_id, + channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = '', module_name = 'DD Alarm condition cleared', logger = self.logger, @@ -283,7 +273,7 @@ return cmd_generic_override( payload = payload, reset = reset, - channel_id = DenaliChannels.dialin_to_dd_ch_id, + channel_id = CanChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_ALARM_STATE_OVERRIDE_REQUEST, entity_name = f'DD {alarm_name} Alarm state', override_text = f'{state_name}',