Index: leahi_dialin/td/modules/alarms.py =================================================================== diff -u -r47a803c6dd859a5bcc7a6c82e0cb160c350473ea -re45b20cdc5d4c5dcff8cef530b173ca94cb2e422 --- leahi_dialin/td/modules/alarms.py (.../alarms.py) (revision 47a803c6dd859a5bcc7a6c82e0cb160c350473ea) +++ leahi_dialin/td/modules/alarms.py (.../alarms.py) (revision e45b20cdc5d4c5dcff8cef530b173ca94cb2e422) @@ -8,24 +8,29 @@ # @file alarms.py # # @author (last) Zoltan Miskolci -# @date (last) 08-Jan-2026 +# @date (last) 05-May-2026 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ -import struct +# Module imports from logging import Logger +import struct from enum import unique +# Project imports from leahi_dialin.common.alarm_defs import AlarmList from leahi_dialin.common.constants import NO_RESET -from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions +from leahi_dialin.common.generic_defs import DataTypes +from leahi_dialin.common.msg_defs import MsgFieldPositions +from leahi_dialin.common.msg_ids import MsgIds from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override -from leahi_dialin.common import td_enum_repository -from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels -from leahi_dialin.utils.base import AbstractSubSystem, DialinEnum, publish +from leahi_dialin.protocols.CAN import CanMessenger, CanMessage, CanChannels +from leahi_dialin.utils.abstract_classes import AbstractSubSystem +from leahi_dialin.utils.base import publish from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray +from leahi_dialin.utils.enums import DialinEnum class TDAlarms(AbstractSubSystem): @@ -58,7 +63,7 @@ AlarmResponseButtons._str_list = {} # Alarm status message field positions - START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX + START_POS_ALARM_STATE = CanMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 @@ -69,36 +74,36 @@ START_POS_ALARMS_FLAGS = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 - def __init__(self, can_interface, logger: Logger): + def __init__(self, can_interface: CanMessenger, logger: Logger): """ - @param can_interface: Denali Can Messenger object + @param can_interface: Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: - self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_alarm_broadcast_ch_id, + self.can_interface.register_receiving_publication_function(channel_id = CanChannels.td_alarm_broadcast_ch_id, message_id = MsgIds.MSG_ID_ALARM_STATUS_DATA.value, function = self._handler_alarms_status_sync) - self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_alarm_broadcast_ch_id, + self.can_interface.register_receiving_publication_function(channel_id = CanChannels.td_alarm_broadcast_ch_id, message_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value, function = self._handler_alarm_trigger) - self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_alarm_broadcast_ch_id, + self.can_interface.register_receiving_publication_function(channel_id = CanChannels.td_alarm_broadcast_ch_id, message_id = MsgIds.MSG_ID_ALARM_CLEARED.value, function = self._handler_alarm_clear) - self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_alarm_broadcast_ch_id, + self.can_interface.register_receiving_publication_function(channel_id = CanChannels.td_alarm_broadcast_ch_id, message_id = MsgIds.MSG_ID_ALARM_CONDITION_CLEARED.value, function = self._handler_alarm_condition_clear) - self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_sync_broadcast_ch_id, + self.can_interface.register_receiving_publication_function(channel_id = CanChannels.td_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_TD_ALARM_INFORMATION_DATA.value, function = self._handler_alarm_information_sync) - self.can_interface.register_receiving_publication_function(channel_id = DenaliChannels.td_sync_broadcast_ch_id, + self.can_interface.register_receiving_publication_function(channel_id = CanChannels.td_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_TD_ACTIVE_ALARMS_LIST_REQUEST_RESPONSE.value, function = self._handler_alarm_list_response) @@ -126,7 +131,6 @@ self.last_alarm_data_2 = 0.0 #: The last Alarm's data part 2 # alarm information - self.alarm_data_type = dict() #: The Alarm's Data type in dict format self.alarm_volume = 0 #: The Alarm's Volume level self.alarm_audio_curr_hg = 0.0 #: The Alarm's Audio high current self.alarm_audio_curr_lg = 0.0 #: The Alarm's Audio low current @@ -142,23 +146,7 @@ self.ui_alarm_list = [] #: The UI Alarm List content self.ui_alarm_list_timestamp = 0.0 #: The timestamp of the last Alarm List - # Loop through the list of the event data type enum and update the dictionary - for data_type in td_enum_repository.TDEventDataTypes: - event_data_type = td_enum_repository.TDEventDataTypes(data_type).name - struct_unpack_type = None - # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is - # the corresponding format in the python struct - if 'U32' in event_data_type or 'BOOL' in event_data_type or 'NONE' in event_data_type: - struct_unpack_type = 'I' - elif 'S32' in event_data_type: - struct_unpack_type = 'i' - elif 'F32' in event_data_type: - struct_unpack_type = 'f' - - self.alarm_data_type[event_data_type] = struct_unpack_type - - def clear_dialin_alarms(self): """ Clears the alarms states in Dialin. @@ -180,12 +168,15 @@ @param message: published alarm status data message @return: none """ - self.alarms_priority_state = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] - self.alarm_top = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] - self.alarms_escalates_in = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] - self.alarms_silence_expires_in = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] - self.alarms_flags = struct.unpack('H', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.START_POS_FIELD_5+2]))[0] + msg_list = [] + msg_list.append(('self.alarms_priority_state', DataTypes.U32)) + msg_list.append(('self.alarm_top', DataTypes.U32)) + msg_list.append(('self.alarms_escalates_in', DataTypes.U32)) + msg_list.append(('self.alarms_silence_expires_in', DataTypes.U32)) + msg_list.append(('self.alarms_flags', DataTypes.U16)) + self.process_into_vars(decoder_list = msg_list, + message = message) self.td_alarm_status_timestamp = timestamp @@ -200,34 +191,41 @@ @return: none """ self.logger.debug("Alarm activated!") - alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) - data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) - # Get the corresponding structure format - struct_data_type_1 = self.alarm_data_type[td_enum_repository.TDEventDataTypes(data_typ_1[0]).name] - # Get the data value by unpacking the data type - data_1 = struct.unpack(struct_data_type_1, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) + result = {} + msg_list = [] + msg_list.append(('alarm_id', DataTypes.U32)) + msg_list.append(('data_typ_1', DataTypes.U32)) + msg_list.append(('data_1', 'data_typ_1')) + msg_list.append(('data_typ_2', DataTypes.U32)) + msg_list.append(('data_2', 'data_typ_2')) + msg_list.append(('priority', DataTypes.U32)) + msg_list.append(('rank', DataTypes.U32)) + msg_list.append(('clr_top_only', DataTypes.U32)) - data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) - # Get the corresponding structure format - struct_data_type_2 = self.alarm_data_type[td_enum_repository.TDEventDataTypes(data_typ_2[0]).name] - # Get the data value by unpacking the data type - data_2 = struct.unpack(struct_data_type_2, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) + i = 1 + for msg_detail in msg_list: + start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') + end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') + if isinstance(msg_detail[1], DataTypes): + unpack_attrib = msg_detail[1].unpack_attrib() + else: + unpack_attrib = DataTypes(result[msg_detail[1]]).unpack_attrib() + + value = struct.unpack(unpack_attrib, bytearray(message['message'][start_pos:end_pos]))[0] + result[msg_detail[0]] = value + i += 1 - priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) - rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) - clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) - - self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) - self.alarm_states[alarm_id[0]] = True - self.alarm_conditions[alarm_id[0]] = True - self.alarm_priorities[alarm_id[0]] = priority[0] - self.alarm_ranks[alarm_id[0]] = rank[0] - self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] - self.alarm_data[alarm_id[0]] = [data_1[0], data_2[0]] - self.last_alarm_triggered = alarm_id[0] - self.last_alarm_data_1 = data_1[0] - self.last_alarm_data_2 = data_2[0] + self.logger.debug("Alarm ID: %d %d %d" % (result['alarm_id'], result['data_1'], result['data_2'])) + self.alarm_states[result['alarm_id']] = True + self.alarm_conditions[result['alarm_id']] = True + self.alarm_priorities[result['alarm_id']] = result['priority'] + self.alarm_ranks[result['alarm_id']] = result['rank'] + self.alarm_clear_top_only_flags[result['alarm_id']] = result['clr_top_only'] + self.alarm_data[result['alarm_id']] = [result['data_1'], result['data_2']] + self.last_alarm_triggered = result['alarm_id'] + self.last_alarm_data_1 = result['data_1'] + self.last_alarm_data_2 = result['data_2'] self.td_alarm_triggered_timestamp = timestamp @@ -239,9 +237,13 @@ @param message: published TD alarm clear message @return: none """ - alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) - self.alarm_states[alarm_id[0]] = False - self.alarm_conditions[alarm_id[0]] = False + msg_list = [] + msg_list.append(('alarm_id', DataTypes.U32)) + + result = self.process_into_vars(decoder_list = msg_list, + message = message) + self.alarm_states[result['alarm_id']] = False + self.alarm_conditions[result['alarm_id']] = False self.td_alarm_cleared_timestamp = timestamp @@ -253,8 +255,13 @@ @param message: published TD alarm clear alarm condition message @return: none """ - alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) - self.alarm_conditions[alarm_id[0]] = False + msg_list = [] + msg_list.append(('alarm_id', DataTypes.U32)) + + result = self.process_into_vars(decoder_list = msg_list, + message = message) + + self.alarm_conditions[result['alarm_id']] = False self.td_alarm_clr_condition_timestamp = timestamp @@ -269,6 +276,28 @@ @param message: published TD alarm information message @return: none """ + msg_list = [] + msg_list.append(('self.alarm_volume', DataTypes.U32)) + msg_list.append(('self.alarm_audio_curr_hg', DataTypes.F32)) + msg_list.append(('self.alarm_audio_curr_lg', DataTypes.F32)) + msg_list.append(('self.alarm_backup_audio_curr', DataTypes.U32)) + msg_list.append(('self.safety_shutdown_active', DataTypes.BOOL)) + msg_list.append(('self.ac_power_lost', DataTypes.BOOL)) + sensor_list = [] + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RESUME.value, DataTypes.BOOL_U08)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RINSEBACK.value, DataTypes.BOOL_U08)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value, DataTypes.BOOL_U08)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RESUME.value, DataTypes.BOOL_U08)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RINSEBACK.value, DataTypes.BOOL_U08)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value, DataTypes.BOOL_U08)) + + self.process_into_vars(decoder_list = msg_list, + message = message) + self.process_into_dict(dict_to_update = self.alarm_table_button_blockers, + decoder_list = sensor_list, + message = message, + start_from_byte = len(msg_list) * DataTypes.U32.size()) + """ vol = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) ach = struct.unpack('f', bytearray( @@ -306,6 +335,7 @@ self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RESUME.value] = True if srs[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RINSEBACK.value] = True if srb[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value] = True if set[0] else False + """ self.td_alarm_information_timestamp = timestamp @@ -317,17 +347,16 @@ @param message: published TD alarm clear alarm condition message @return: none """ - self.ui_alarm_list_accepted = struct.unpack('i',bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] - self.ui_alarm_list_reject_reason = struct.unpack('i',bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] + self.ui_alarm_list_accepted = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] + self.ui_alarm_list_reject_reason = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] i = 3 for _ in range(0,10): start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') - value = struct.unpack('i',bytearray(message['message'][start_pos:end_pos]))[0] + value = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] self.ui_alarm_list.append(value) i += 1 - self.ui_alarm_list_timestamp = timestamp @@ -345,7 +374,7 @@ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_STATUS_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name = 'TD Alarm Status', logger = self.logger, @@ -366,7 +395,7 @@ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_INFO_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name = 'TD Alarm Info', logger = self.logger, @@ -396,7 +425,7 @@ return cmd_generic_override( payload = payload, reset = NO_RESET, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_STATE_OVERRIDE_REQUEST, entity_name = f'TD {alarm_name} Alarm state', override_text = f'{state_name}', @@ -419,7 +448,7 @@ return cmd_generic_override( payload = payload, reset = NO_RESET, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_CLEAR_ALL_ALARMS_REQUEST, entity_name = f'TD Alarms', override_text = f'Cleared', @@ -448,7 +477,7 @@ return cmd_generic_override( payload = payload, reset = NO_RESET, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_START_TIME_OVERRIDE_REQUEST, entity_name = f'TD {alarm_name} Alarms time', override_text = f'{time_ms} ms', @@ -498,7 +527,7 @@ return cmd_generic_override( payload = payload, reset = NO_RESET, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_AUDIO_LEVEL_OVERRIDE_REQUEST, entity_name = f'TD Alarms lamp pattern', override_text = f'{str_pat}', @@ -524,7 +553,7 @@ return cmd_generic_override( payload = payload, reset = NO_RESET, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_AUDIO_LEVEL_OVERRIDE_REQUEST, entity_name = f'TD Alarms audio volume', override_text = f'{str(volume)}', @@ -549,7 +578,7 @@ return cmd_generic_override( payload = payload, reset = NO_RESET, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_AUDIO_CURRENT_HG_OVERRIDE_REQUEST, entity_name = f'TD Alarms audio high gain current', override_text = f'{str(current)} mA', @@ -574,7 +603,7 @@ return cmd_generic_override( payload = payload, reset = NO_RESET, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_ALARM_AUDIO_CURRENT_LG_OVERRIDE_REQUEST, entity_name = f'TD Alarms audio low gain current', override_text = f'{str(current)} mA', @@ -599,7 +628,7 @@ return cmd_generic_override( payload = payload, reset = NO_RESET, - channel_id = DenaliChannels.dialin_to_td_ch_id, + channel_id = CanChannels.dialin_to_td_ch_id, msg_id = MsgIds.MSG_ID_TD_BACKUP_ALARM_AUDIO_CURRENT_OVERRIDE_REQUEST, entity_name = f'TD Alarms backup audio current', override_text = f'{str(current)} mA',