########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Michael Garthwaite # @date (last) 04-Oct-2023 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ import struct from logging import Logger from enum import unique from leahi_dialin.utils.base import DialinEnum from .constants import RESET, NO_RESET from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.td_defs import TDEventDataType from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish from leahi_dialin.utils.checks import check_broadcast_interval_override_ms from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray class TDAlarms(AbstractSubSystem): """ TD interface containing alarm related commands. """ # Alarm lamp patterns TD_ALARM_LAMP_PATTERN_OFF = 0 TD_ALARM_LAMP_PATTERN_OK = 1 TD_ALARM_LAMP_PATTERN_FAULT = 2 TD_ALARM_LAMP_PATTERN_HIGH = 3 TD_ALARM_LAMP_PATTERN_MEDIUM = 4 TD_ALARM_LAMP_PATTERN_LOW = 5 TD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm priority states TD_ALARM_STATE_NONE = 0 TD_ALARM_STATE_LOW = 1 TD_ALARM_STATE_MEDIUM = 2 TD_ALARM_STATE_HIGH = 3 # Alarm response buttons @unique class AlarmResponseButtons(DialinEnum): TD_ALARM_RESPONSE_BUTTON_RESUME = 0 TD_ALARM_RESPONSE_BUTTON_RINSEBACK = 1 TD_ALARM_RESPONSE_BUTTON_END_TREATMENT = 2 NUM_OF_TD_ALARM_RESPONSE_BUTTONS = 3 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_TOP END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_ESCALATES_IN END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARMS_FLAGS = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.td_alarm_broadcast_ch_id self.msg_id_td_alarm_status_data = MsgIds.MSG_ID_ALARM_STATUS_DATA.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_alarm_status_data, self._handler_alarms_status_sync) channel_id = DenaliChannels.td_alarm_broadcast_ch_id self.msg_id_td_alarm_triggered = MsgIds.MSG_ID_ALARM_TRIGGERED.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_alarm_triggered, self._handler_alarm_trigger) self.msg_id_td_alarm_cleared = MsgIds.MSG_ID_ALARM_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_alarm_cleared, self._handler_alarm_clear) self.msg_id_td_alarm_condition_cleared = MsgIds.MSG_ID_ALARM_CONDITION_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_alarm_condition_cleared, self._handler_alarm_condition_clear) channel_id = DenaliChannels.td_sync_broadcast_ch_id self.msg_id_td_alarm_info_data = MsgIds.MSG_ID_TD_ALARM_INFORMATION_DATA.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_td_alarm_info_data, self._handler_alarm_information_sync) self.td_alarm_status_timestamp = 0.0 self.td_alarm_triggered_timestamp = 0.0 self.td_alarm_cleared_timestamp = 0.0 self.td_alarm_clr_condition_timestamp = 0.0 self.td_alarm_information_timestamp = 0.0 # composite alarm status based on latest TD alarm status broadcast message self.alarms_priority_state = 0 self.alarm_top = 0 self.alarms_silence_expires_in = 0 self.alarms_escalates_in = 0 self.alarms_flags = 0 # alarm states based on received TD alarm activation and alarm clear messages self.alarm_states = [False] * 500 # alarm condition states based on received TD alarm activation and clear condition messages self.alarm_conditions = [False] * 500 # alarm priorities based on received TD alarm activation messages self.alarm_priorities = [0] * 500 # alarm ranks based on received TD alarm activation messages self.alarm_ranks = [0] * 500 # alarm clear top only flags based on received TD alarm activation messages self.alarm_clear_top_only_flags = [False] * 500 # alarm debug data on alarm triggered message self.alarm_data = [0, 0] * 500 self.last_alarm_triggered = 0 self.last_alarm_data_1 = 0.0 self.last_alarm_data_2 = 0.0 # alarm information self.alarm_data_type = dict() self.alarm_volume = 0 self.alarm_audio_curr_hg = 0.0 self.alarm_audio_curr_lg = 0.0 self.alarm_backup_audio_curr = 0.0 self.safety_shutdown_active = False self.ac_power_lost = False self.alarm_table_button_blockers = [False] * self.AlarmResponseButtons.NUM_OF_TD_ALARM_RESPONSE_BUTTONS.value self.alarm_state_button_blockers = [False] * self.AlarmResponseButtons.NUM_OF_TD_ALARM_RESPONSE_BUTTONS.value # Loop through the list of the event data type enum and update the dictionary for data_type in TDEventDataType: event_data_type = TDEventDataType(data_type).name struct_unpack_type = None # If U32 is in the data type enum (i.e. EVENT_DATA_TYPE_U32), then the key is the enum and the value is # the corresponding format in the python struct if 'U32' in event_data_type or 'BOOL' in event_data_type or 'NONE' in event_data_type: struct_unpack_type = 'I' elif 'S32' in event_data_type: struct_unpack_type = 'i' elif 'F32' in event_data_type: struct_unpack_type = 'f' self.alarm_data_type[event_data_type] = struct_unpack_type def clear_dialin_alarms(self): """ Clears the alarms states in Dialin. @return: none """ for x in range(500): self.alarm_states[x] = False @publish(["msg_id_td_alarm_status_data", "alarms_priority_state", "alarm_top", "alarms_silence_expires_in", "alarms_escalates_in", "alarms_flags", "td_alarm_status_timestamp"]) def _handler_alarms_status_sync(self, message, timestamp=0.0): """ Handles published alarms status messages. alarms status data are captured for reference. @param message: published alarm status data message @return: none """ self.alarms_priority_state = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.alarm_top = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.alarms_escalates_in = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.alarms_silence_expires_in = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.alarms_flags = struct.unpack('H', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.START_POS_FIELD_5+2]))[0] self.td_alarm_status_timestamp = timestamp @publish(["msg_id_td_alarm_triggered", "alarm_states", "alarm_conditions", "alarm_data", "alarm_priorities", "alarm_ranks", "alarm_clear_top_only_flags", "td_alarm_triggered_timestamp"]) def _handler_alarm_trigger(self, message, timestamp=0.0): """ Handles published TD alarm activation messages. @param message: published TD alarm activation message @return: none """ self.logger.debug("Alarm activated!") alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) # Get the corresponding structure format struct_data_type_1 = self.alarm_data_type[TDEventDataType(data_typ_1[0]).name] # Get the data value by unpacking the data type data_1 = struct.unpack(struct_data_type_1, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) # Get the corresponding structure format struct_data_type_2 = self.alarm_data_type[TDEventDataType(data_typ_2[0]).name] # Get the data value by unpacking the data type data_2 = struct.unpack(struct_data_type_2, bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) self.logger.debug("Alarm ID: %d %d %d" % (alarm_id[0], data_1[0], data_2[0])) self.alarm_states[alarm_id[0]] = True self.alarm_conditions[alarm_id[0]] = True self.alarm_priorities[alarm_id[0]] = priority[0] self.alarm_ranks[alarm_id[0]] = rank[0] self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] self.alarm_data[alarm_id[0]] = [data_1[0], data_2[0]] self.last_alarm_triggered = alarm_id[0] self.last_alarm_data_1 = data_1[0] self.last_alarm_data_2 = data_2[0] self.td_alarm_triggered_timestamp = timestamp @publish(["msg_id_td_alarm_cleared", "alarm_states", "alarm_conditions", "TD_alarm_cleared_timestamp"]) def _handler_alarm_clear(self, message, timestamp=0.0): """ Handles published TD alarm clear messages. @param message: published TD alarm clear message @return: none """ alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) self.alarm_states[alarm_id[0]] = False self.alarm_conditions[alarm_id[0]] = False self.td_alarm_cleared_timestamp = timestamp @publish(["msg_id_td_alarm_condition_cleared", "alarm_conditions", "alarm_conditions", "td_alarm_clr_condition_timestamp"]) def _handler_alarm_condition_clear(self, message, timestamp=0.0): """ Handles published TD alarm clear alarm condition messages. @param message: published TD alarm clear alarm condition message @return: none """ alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) self.alarm_conditions[alarm_id[0]] = False self.td_alarm_clr_condition_timestamp = timestamp @publish(["msg_id_td_alarm_info_data", "alarm_volume", "alarm_audio_curr_hg", "alarm_audio_curr_lg", "alarm_backup_audio_curr", "safety_shutdown_active", "ac_power_lost", "alarm_table_button_blockers", "alarm_state_button_blockers", "td_alarm_information_timestamp"]) def _handler_alarm_information_sync(self, message, timestamp=0.0): """ Handles published TD alarm information broadcast messages. @param message: published TD alarm information message @return: none """ vol = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) ach = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) acl = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) bac = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) saf = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) acp = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) trs = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.START_POS_FIELD_7+1])) trb = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7+1:MsgFieldPositions.START_POS_FIELD_7+2])) tet = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7+2:MsgFieldPositions.START_POS_FIELD_7+3])) srs = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7+3:MsgFieldPositions.START_POS_FIELD_7+4])) srb = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7+4:MsgFieldPositions.START_POS_FIELD_7+5])) set = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7+5:MsgFieldPositions.START_POS_FIELD_7+6])) self.alarm_volume = vol[0] self.alarm_audio_curr_hg = ach[0] self.alarm_audio_curr_lg = acl[0] self.alarm_backup_audio_curr = bac[0] self.safety_shutdown_active = True if saf[0] == 1 else False self.ac_power_lost = True if acp[0] == 1 else False self.alarm_table_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RESUME.value] = True if trs[0] == 1 else False self.alarm_table_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RINSEBACK.value] = True if trb[0] else False self.alarm_table_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value] = True if tet[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RESUME.value] = True if srs[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_RINSEBACK.value] = True if srb[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.TD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value] = True if set[0] else False self.td_alarm_information_timestamp = timestamp def cmd_alarm_state_override(self, alarm: int, state: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm state override command Constraints: Must be logged into TD. Given alarm must be valid. If inactivating alarm, given alarm must be recoverable (clearable). @param alarm: integer - ID of alarm to override @param state: integer - 1 for alarm active, 0 for alarm inactive @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sta = integer_to_bytearray(state) alm = integer_to_bytearray(alarm) payload = rst + sta + alm message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_STATE_OVERRIDE_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") self.logger.debug("Alarm " + str(alarm) + " " + str_res + ": " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_clear_all_alarms(self) -> int: """ Constructs and sends the clear all active alarms command. This will clear even non-recoverable alarms. Constraints: Must be logged into TD. @return: 1 if successful, zero otherwise """ key = integer_to_bytearray(-758926171) # 0xD2C3B4A5 payload = key message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_CLEAR_ALL_ALARMS_REQUEST.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("All alarms cleared.") # response payload is OK or not OK return 1 == received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] return False def cmd_alarm_time_override(self, alarm: int, time_ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm time override command Constraints: Must be logged into TD. Given alarm must be valid. @param alarm: integer - ID of alarm to override @param time_ms: integer - time (in ms) since alarm was activated @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) ms = integer_to_bytearray(time_ms) alm = integer_to_bytearray(alarm) payload = rst + ms + alm message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_START_TIME_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("override alarm time since activated") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = str(time_ms) self.logger.debug("Alarm time since activated overridden to " + str_res + " ms: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_lamp_pattern_override(self, pattern: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm lamp pattern override command. Constraints: Must be logged into TD. Given pattern must be one of the patterns listed below. @param pattern: integer - ID of alarm lamp pattern to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise Patterns: \n 0 = off \n 1 = ok \n 2 = fault \n 3 = high \n 4 = medium \n 5 = low \n 6 = manual """ rst = integer_to_bytearray(reset) pat = integer_to_bytearray(pattern) payload = rst + pat message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_LAMP_PATTERN_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("Override Alarm Lamp Pattern") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug(received_message) if reset == RESET: str_pat = "reset back to normal" elif pattern == self.TD_ALARM_LAMP_PATTERN_OFF: str_pat = "off" elif pattern == self.TD_ALARM_LAMP_PATTERN_OK: str_pat = "ok" elif pattern == self.TD_ALARM_LAMP_PATTERN_FAULT: str_pat = "fault" elif pattern == self.TD_ALARM_LAMP_PATTERN_HIGH: str_pat = "high" elif pattern == self.TD_ALARM_LAMP_PATTERN_MEDIUM: str_pat = "medium" elif pattern == self.TD_ALARM_LAMP_PATTERN_LOW: str_pat = "low" else: str_pat = "manual" self.logger.debug("Alarm lamp pattern overridden to " + str_pat + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_info_broadcast_interval_override(self, ms: int = 1000, reset: int = NO_RESET): """ Constructs and sends the alarm information broadcast interval override command Constraints: Must be logged into TD. Given interval must be non-zero and a multiple of the TD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_INFO_PUBLISH_INTERVAL_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("override alarm information broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Alarm information broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_status_broadcast_interval_override(self, ms: int = 250, reset: int = NO_RESET): """ Constructs and sends the alarm status broadcast interval override command Constraints: Must be logged into TD. Given interval must be non-zero and a multiple of the TD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_STATUS_PUBLISH_INTERVAL_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("override alarm status broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Alarm status broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_level_override(self, volume: int = 5, reset: int = NO_RESET): """ Constructs and sends the alarm audio volume override command Constraints: Must be logged into TD. Given volume must be an integer between 1 and 5. @param volume: integer - alarm audio volume level (1..5) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vol = integer_to_bytearray(volume) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_AUDIO_LEVEL_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("override alarm audio volume level") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(volume) + ": " self.logger.debug("Alarm audio volume level overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_current_hg_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the alarm audio current (high gain) override command Constraints: Must be logged into TD. @param current: float - current (in mA) for high gain alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_AUDIO_CURRENT_HG_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("override alarm audio high gain current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm audio high gain current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_current_lg_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the alarm audio current (low gain) override command Constraints: Must be logged into TD. @param current: float - current (in mA) for low gain alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_ALARM_AUDIO_CURRENT_LG_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("override alarm audio high gain current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm audio low gain current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_backup_audio_current_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the backup alarm audio current override command Constraints: Must be logged into TD. @param current: float - current (in mA) for backup alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_td_ch_id, message_id=MsgIds.MSG_ID_TD_BACKUP_ALARM_AUDIO_CURRENT_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("override alarm backup audio current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm backup audio current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False