Index: leahi_dialin/td/modules/alarms.py =================================================================== diff -u -rf03c9a32180573f2430105ad69eb91d9f597a96b -r34b64ff2d8a64f4b7b60b80bb7cf4c36845e5943 --- leahi_dialin/td/modules/alarms.py (.../alarms.py) (revision f03c9a32180573f2430105ad69eb91d9f597a96b) +++ leahi_dialin/td/modules/alarms.py (.../alarms.py) (revision 34b64ff2d8a64f4b7b60b80bb7cf4c36845e5943) @@ -8,21 +8,23 @@ # @file alarms.py # # @author (last) Zoltan Miskolci -# @date (last) 08-Jan-2026 +# @date (last) 05-May-2026 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ -import struct +# Module imports from logging import Logger +import struct from enum import unique +# Project imports from leahi_dialin.common.alarm_defs import AlarmList from leahi_dialin.common.constants import NO_RESET +from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override -from leahi_dialin.common import td_enum_repository from leahi_dialin.protocols.CAN import DenaliCanMessenger, DenaliMessage, DenaliChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish @@ -128,7 +130,6 @@ self.last_alarm_data_2 = 0.0 #: The last Alarm's data part 2 # alarm information - self.alarm_data_type = dict() #: The Alarm's Data type in dict format self.alarm_volume = 0 #: The Alarm's Volume level self.alarm_audio_curr_hg = 0.0 #: The Alarm's Audio high current self.alarm_audio_curr_lg = 0.0 #: The Alarm's Audio low current @@ -144,23 +145,7 @@ self.ui_alarm_list = [] #: The UI Alarm List content self.ui_alarm_list_timestamp = 0.0 #: The timestamp of the last Alarm List - # Loop through the list of the event data type enum and update the dictionary - for data_type in td_enum_repository.DataTypes: - event_data_type = td_enum_repository.DataTypes(data_type).name - struct_unpack_type = None - # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is - # the corresponding format in the python struct - if 'U32' in event_data_type or 'BOOL' in event_data_type or 'NONE' in event_data_type: - struct_unpack_type = 'I' - elif 'S32' in event_data_type: - struct_unpack_type = 'i' - elif 'F32' in event_data_type: - struct_unpack_type = 'f' - - self.alarm_data_type[event_data_type] = struct_unpack_type - - def clear_dialin_alarms(self): """ Clears the alarms states in Dialin. @@ -182,12 +167,16 @@ @param message: published alarm status data message @return: none """ - self.alarms_priority_state = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] - self.alarm_top = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] - self.alarms_escalates_in = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] - self.alarms_silence_expires_in = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] - self.alarms_flags = struct.unpack('H', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.START_POS_FIELD_5+2]))[0] + msg_list = [] + msg_list.append(('self.alarms_priority_state', DataTypes.U32)) + msg_list.append(('self.alarm_top', DataTypes.U32)) + msg_list.append(('self.alarms_escalates_in', DataTypes.U32)) + msg_list.append(('self.alarms_silence_expires_in', DataTypes.U32)) + msg_list.append(('self.alarms_flags', DataTypes.U16)) + self.process_into_vars(decoder_list = msg_list, + message = message) + self.td_alarm_status_timestamp = timestamp @@ -202,35 +191,50 @@ @return: none """ self.logger.debug("Alarm activated!") - alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) + alarm_id:int + data_typ_1:int + data_1:int + data_typ_2:int + data_2:int + priority:int + rank:int + clr_top_only:int - data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) - # Get the corresponding structure format - struct_data_type_1 = self.alarm_data_type[td_enum_repository.DataTypes(data_typ_1[0]).name] - # Get the data value by unpacking the data type - data_1 = struct.unpack(struct_data_type_1, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) + msg_list = [] + msg_list.append(('alarm_id', DataTypes.U32)) + msg_list.append(('data_typ_1', DataTypes.U32)) + msg_list.append(('data_1', 'data_typ_1')) + msg_list.append(('data_typ_2', DataTypes.U32)) + msg_list.append(('data_2', 'data_typ_2')) + msg_list.append(('priority', DataTypes.U32)) + msg_list.append(('rank', DataTypes.U32)) + msg_list.append(('clr_top_only', DataTypes.U32)) - data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) - # Get the corresponding structure format - struct_data_type_2 = self.alarm_data_type[td_enum_repository.DataTypes(data_typ_2[0]).name] - # Get the data value by unpacking the data type - data_2 = struct.unpack(struct_data_type_2, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) + i = 1 + for msg_detail in msg_list: + start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') + end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') + if isinstance(msg_detail[1], DataTypes): + unpack_attrib = msg_detail[1].unpack_attrib() + else: + data_type_val:int + exec(f'data_type_val = {msg_detail[1]}') + unpack_attrib = DataTypes(data_type_val).unpack_attrib() + value = struct.unpack(unpack_attrib,bytearray(message['message'][start_pos:end_pos]))[0] + exec(f'{msg_detail[0]} = {value}') + i += 1 - priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) - rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) - clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) - self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) - self.alarm_states[alarm_id[0]] = True - self.alarm_conditions[alarm_id[0]] = True - self.alarm_priorities[alarm_id[0]] = priority[0] - self.alarm_ranks[alarm_id[0]] = rank[0] - self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] - self.alarm_data[alarm_id[0]] = [data_1[0], data_2[0]] - self.last_alarm_triggered = alarm_id[0] - self.last_alarm_data_1 = data_1[0] - self.last_alarm_data_2 = data_2[0] - self.td_alarm_triggered_timestamp = timestamp + self.alarm_states[alarm_id] = True + self.alarm_conditions[alarm_id] = True + self.alarm_priorities[alarm_id] = priority + self.alarm_ranks[alarm_id] = rank + self.alarm_clear_top_only_flags[alarm_id] = clr_top_only + self.alarm_data[alarm_id] = [data_1, data_2] + self.last_alarm_triggered = alarm_id + self.last_alarm_data_1 = data_1 + self.last_alarm_data_2 = data_2 + self.dd_alarm_triggered_timestamp = timestamp @publish(["msg_id_td_alarm_cleared", "alarm_states", "alarm_conditions", "TD_alarm_cleared_timestamp"]) @@ -241,9 +245,15 @@ @param message: published TD alarm clear message @return: none """ - alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) - self.alarm_states[alarm_id[0]] = False - self.alarm_conditions[alarm_id[0]] = False + alarm_id: int + msg_list = [] + msg_list.append(('alarm_id', DataTypes.U32)) + + self.process_into_vars(decoder_list = msg_list, + message = message) + + self.alarm_states[alarm_id] = False + self.alarm_conditions[alarm_id] = False self.td_alarm_cleared_timestamp = timestamp @@ -255,8 +265,14 @@ @param message: published TD alarm clear alarm condition message @return: none """ - alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) - self.alarm_conditions[alarm_id[0]] = False + alarm_id: int + msg_list = [] + msg_list.append(('alarm_id', DataTypes.U32)) + + self.process_into_vars(decoder_list = msg_list, + message = message) + + self.alarm_conditions[alarm_id] = False self.td_alarm_clr_condition_timestamp = timestamp @@ -271,6 +287,28 @@ @param message: published TD alarm information message @return: none """ + msg_list = [] + msg_list.append(('self.alarm_volume', DataTypes.U32)) + msg_list.append(('self.alarm_audio_curr_hg', DataTypes.F32)) + msg_list.append(('self.alarm_audio_curr_lg', DataTypes.F32)) + msg_list.append(('self.alarm_backup_audio_curr', DataTypes.U32)) + msg_list.append(('self.safety_shutdown_active', DataTypes.BOOL)) + msg_list.append(('self.ac_power_lost', DataTypes.BOOL)) + sensor_list = [] + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RESUME.value, DataTypes.BOOL)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RINSEBACK.value, DataTypes.BOOL)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value, DataTypes.BOOL)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RESUME.value, DataTypes.BOOL)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RINSEBACK.value, DataTypes.BOOL)) + sensor_list.append((self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value, DataTypes.BOOL)) + + self.process_into_vars(decoder_list = msg_list, + message = message) + self.process_into_dict(dict_to_update = self.alarm_table_button_blockers, + decoder_list = sensor_list, + message = message, + start_from_byte = len(msg_list) * DataTypes.U32.size()) + """ vol = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) ach = struct.unpack('f', bytearray( @@ -308,6 +346,7 @@ self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RESUME.value] = True if srs[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RINSEBACK.value] = True if srb[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value] = True if set[0] else False + """ self.td_alarm_information_timestamp = timestamp @@ -319,17 +358,16 @@ @param message: published TD alarm clear alarm condition message @return: none """ - self.ui_alarm_list_accepted = struct.unpack('i',bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] - self.ui_alarm_list_reject_reason = struct.unpack('i',bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] + self.ui_alarm_list_accepted = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] + self.ui_alarm_list_reject_reason = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] i = 3 for _ in range(0,10): start_pos = eval(f'MsgFieldPositions.START_POS_FIELD_{i}') end_pos = eval(f'MsgFieldPositions.END_POS_FIELD_{i}') - value = struct.unpack('i',bytearray(message['message'][start_pos:end_pos]))[0] + value = struct.unpack(DataTypes.U32.unpack_attrib(), bytearray(message['message'][start_pos:end_pos]))[0] self.ui_alarm_list.append(value) i += 1 - self.ui_alarm_list_timestamp = timestamp