########################################################################### # # Copyright (c) 2020-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Dara Navaei # @date (last) 21-Dec-2022 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ import struct from logging import Logger from enum import unique from ..utils.base import DialinEnum from .constants import RESET, NO_RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray, float_to_bytearray class HDAlarms(AbstractSubSystem): """ HD interface containing alarm related commands. """ # Alarm lamp patterns HD_ALARM_LAMP_PATTERN_OFF = 0 HD_ALARM_LAMP_PATTERN_OK = 1 HD_ALARM_LAMP_PATTERN_FAULT = 2 HD_ALARM_LAMP_PATTERN_HIGH = 3 HD_ALARM_LAMP_PATTERN_MEDIUM = 4 HD_ALARM_LAMP_PATTERN_LOW = 5 HD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm states HD_ALARM_STATE_NONE = 0 HD_ALARM_STATE_LOW = 1 HD_ALARM_STATE_MEDIUM = 2 HD_ALARM_STATE_HIGH = 3 # Alarm response buttons @unique class AlarmResponseButtons(DialinEnum): HD_ALARM_RESPONSE_BUTTON_RESUME = 0 HD_ALARM_RESPONSE_BUTTON_RINSEBACK = 1 HD_ALARM_RESPONSE_BUTTON_END_TREATMENT = 2 NUM_OF_HD_ALARM_RESPONSE_BUTTONS = 3 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_TOP END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_ESCALATES_IN END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARMS_FLAGS = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_STATUS.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarms_status_sync) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_activate) msg_id = MsgIds.MSG_ID_ALARM_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_clear) msg_id = MsgIds.MSG_ID_ALARM_CONDITION_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_condition_clear) channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_ALARM_INFORMATION.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_information_sync) # composite alarm status based on latest HD alarm status broadcast message self.alarms_priority_state = 0 self.alarm_top = 0 self.alarms_silence_expires_in = 0 self.alarms_escalates_in = 0 self.alarms_flags = 0 # alarm states based on received HD alarm activation and alarm clear messages self.alarm_states = [False] * 500 # alarm condition states based on received HD alarm activation and clear condition messages self.alarm_conditions = [False] * 500 # alarm priorities based on received HD alarm activation messages self.alarm_priorities = [0] * 500 # alarm ranks based on received HD alarm activation messages self.alarm_ranks = [0] * 500 # alarm clear top only flags based on received HD alarm activation messages self.alarm_clear_top_only_flags = [False] * 500 # alarm information self.alarm_volume = 0 self.alarm_audio_curr_hg = 0.0 self.alarm_audio_curr_lg = 0.0 self.alarm_backup_audio_curr = 0.0 self.safety_shutdown_active = False self.alarm_table_button_blockers = [False] * self.AlarmResponseButtons.NUM_OF_HD_ALARM_RESPONSE_BUTTONS.value self.alarm_state_button_blockers = [False] * self.AlarmResponseButtons.NUM_OF_HD_ALARM_RESPONSE_BUTTONS.value def get_current_alarms_state(self): """ Gets the current alarms state. @return: (int) the current alarms state. """ return self.alarms_priority_state def get_alarm_states(self): """ Gets all states for all alarms @return: List of booleans of size 500 """ return self.alarm_states def get_alarm_response_button_table_blocker_states(self): """ Gets the states (T/F) of the alarm response buttons blocked by alarm table properties. @return: List of booleans of size 3 (for resume, rinseback and end treatment buttons) """ return self.alarm_table_button_blockers def get_alarm_response_button_state_blocker_states(self): """ Gets the states (T/F) of the alarm response buttons blocked by state properties. @return: List of booleans of size 3 (for resume, rinseback and end treatment buttons) """ return self.alarm_state_button_blockers def get_alarm_conditions(self): """ Gets all alarm condition states for all alarms @return: List of booleans of size 500 """ return self.alarm_conditions def get_alarm_state(self, alarm_id): """ Gets alarm state for given alarm @return: Alarm state """ return self.alarm_states[alarm_id] def get_alarm_priority(self, alarm_id): """ Gets alarm priority for given alarm. 0=None 1=Low 2=Medium 3=High @return: Alarm priority """ return self.alarm_priorities[alarm_id] def get_alarm_rank(self, alarm_id): """ Gets alarm rank for given alarm. @return: Alarm rank """ return self.alarm_ranks[alarm_id] def get_alarm_clear_top_only(self, alarm_id): """ Gets alarm "clear top only" property for given alarm. @return: T/F """ return self.alarm_clear_top_only_flags[alarm_id] def get_alarms_top(self): """ Gets the top alarm @return: (int) the top alarm """ return self.alarm_top def get_alarms_silence_expires_in(self): """ Gets the remaining time the alarms will be silenced (s) @return: (int) how long until the alarm silence expires """ return self.alarms_silence_expires_in def get_alarms_escalates_in(self): """ Gets the alarms escalates in time (s) @return: (int) how long until the alarm escalates """ return self.alarms_escalates_in def get_alarms_flags(self): """ Gets the alarms flags Extract each flag from the flags int using bit-masking. E.g. System Fault = result & 1 Stop = result & 2 No Clear = result & 4 No Resume = result & 8 No Rinseback = result & 16 No End Treatment = result & 32 No New Treatment = result & 64 User Must ACK = result & 128 Alarms to Escalate = result & 256 Alarms Silenced = result & 512 Alarm Lamp On = result & 1024 TBD = result & 2048 No Blood Recirc = result & 4096 No Dialysate Recirc = result & 8192 No Minimize = result & 16384 Condition Detected = result & 32768 @return: (int) The alarms flags value """ return self.alarms_flags def get_alarm_volume(self): """ Gets the alarm audio volume level. @return: (int) current alarm audio volume (1..5) """ return self.alarm_volume def get_alarm_audio_current_hg(self): """ Gets the alarm audio current - high gain. @return: (float) alarm audio current - high gain (in mA) """ return self.alarm_audio_curr_hg def get_alarm_audio_current_lg(self): """ Gets the alarm audio current - low gain. @return: (float) alarm audio current - low gain (in mA) """ return self.alarm_audio_curr_lg def get_alarm_backup_audio_current(self): """ Gets the alarm backup audio current. @return: (float) alarm backup audio current (in mA) """ return self.alarm_backup_audio_curr def get_safety_shutdown_activated(self): """ Gets the state of the HD safety shutdown signal. @return: (bool) safety shutdown line is activated (T/F) """ return self.safety_shutdown_active def get_alarm_flag_system_fault(self) -> bool: """ Gets the alarm flag system fault. @return: (bool) Alarm flag system fault (T/F) """ return (self.alarms_flags & 1) > 0 def get_alarm_flag_stop(self) -> bool: """ Gets the alarm flag no clear. @return: (bool) Alarm flag no clear (T/F) """ return (self.alarms_flags & 2) > 0 def get_alarm_flag_no_clear(self) -> bool: """ Gets the alarm flag no clear. @return: (bool) Alarm flag no clear (T/F) """ return (self.alarms_flags & 4) > 0 def get_alarm_flag_no_resume(self) -> bool: """ Gets the alarm flag no resume. @return: (bool) Alarm flag no resume (T/F) """ return (self.alarms_flags & 8) > 0 def get_alarm_flag_no_rinseback(self) -> bool: """ Gets the alarm flag no rinseback. @return: (bool) Alarm flag no rinseback (T/F) """ return (self.alarms_flags & 16) > 0 def get_alarm_flag_no_end_treatment(self) -> bool: """ Gets the alarm flag no end treatment. @return: (bool) Alarm flag no end treatment (T/F) """ return (self.alarms_flags & 32) > 0 def get_alarm_flag_no_new_treatment(self) -> bool: """ Gets the alarm flag no new treatment. @return: (bool) Alarm flag no new treatment (T/F) """ return (self.alarms_flags & 64) > 0 def get_alarm_flag_lamp_on(self) -> bool: """ Gets the alarm flag lamp on. @return: (bool) Alarm lamp on (T/F) """ return (self.alarms_flags & 1024) > 0 def get_alarm_flag_no_blood_recirculation(self) -> bool: """ Gets the alarm flag no blood recirculation. @return: (bool) Alarm lamp on (T/F) """ return (self.alarms_flags & 4096) > 0 def get_alarm_flag_no_dialysate_recirculation(self) -> bool: """ Gets the alarm flag no dialysate recirculation. @return: (bool) Alarm lamp on (T/F) """ return (self.alarms_flags & 8192) > 0 def get_alarm_flag_no_minimize(self) -> bool: """ Gets the alarm flag no minimize. @return: (bool) Alarm cannot be minimized (T/F) """ return (self.alarms_flags & 16384) > 0 def clear_dialin_alarms(self): """ Clears the alarms states in Dialin. @return: none """ for x in range(500): self.alarm_states[x] = False @publish(["alarms_state", "alarm_top", "alarms_silence_expires_in", "alarms_escalates_in", "alarms_flags"]) def _handler_alarms_status_sync(self, message): """ Handles published alarms status messages. alarms status data are captured for reference. @param message: published blood flow data message @return: none """ self.alarms_state = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), byteorder=DenaliMessage.BYTE_ORDER) self.alarm_top = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_escalates_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_ESCALATES_IN:self.END_POS_ALARM_ESCALATES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_silence_expires_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_flags = int.from_bytes(bytearray( message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]), byteorder=DenaliMessage.BYTE_ORDER) @publish(["alarm_states", "alarm_conditions"]) def _handler_alarm_activate(self, message): """ Handles published HD alarm activation messages. @param message: published HD alarm activation message @return: none """ self.logger.debug("Alarm activated!") alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) data_typ_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) data_1 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) data_typ_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) data_2 = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) priority = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6])) rank = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7])) clr_top_only = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8])) self.logger.debug("Alarm ID: %d %d %d"%(alarm_id[0], data_1[0], data_2[0])) self.alarm_states[alarm_id[0]] = True self.alarm_conditions[alarm_id[0]] = True self.alarm_priorities[alarm_id[0]] = priority[0] self.alarm_ranks[alarm_id[0]] = rank[0] self.alarm_clear_top_only_flags[alarm_id[0]] = clr_top_only[0] @publish(["alarm_states", "alarm_conditions"]) def _handler_alarm_clear(self, message): """ Handles published HD alarm clear messages. @param message: published HD alarm clear message @return: none """ alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) self.alarm_states[alarm_id[0]] = False self.alarm_conditions[alarm_id[0]] = False @publish(["alarm_conditions", "alarm_conditions"]) def _handler_alarm_condition_clear(self, message): """ Handles published HD alarm clear alarm condition messages. @param message: published HD alarm clear alarm condition message @return: none """ alarm_id = struct.unpack('i', bytearray(message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) self.alarm_conditions[alarm_id[0]] = False @publish(["alarm_volume", "alarm_audio_curr_hg", "alarm_audio_curr_lg", "alarm_backup_audio_curr", "safety_shutdown_active", "alarm_table_button_blockers", "alarm_state_button_blockers"]) def _handler_alarm_information_sync(self, message): """ Handles published HD alarm information broadcast messages. @param message: published HD alarm information message @return: none """ vol = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1])) ach = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2])) acl = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3])) bac = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4])) saf = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5])) trs = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.END_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5+1])) trb = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.END_POS_FIELD_5+1:MsgFieldPositions.END_POS_FIELD_5+2])) tet = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.END_POS_FIELD_5+2:MsgFieldPositions.END_POS_FIELD_5+3])) srs = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.END_POS_FIELD_5+3:MsgFieldPositions.END_POS_FIELD_5+4])) srb = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.END_POS_FIELD_5+4:MsgFieldPositions.END_POS_FIELD_5+5])) set = struct.unpack('B', bytearray( message['message'][MsgFieldPositions.END_POS_FIELD_5+5:MsgFieldPositions.END_POS_FIELD_5+6])) self.alarm_volume = vol[0] self.alarm_audio_curr_hg = ach[0] self.alarm_audio_curr_lg = acl[0] self.alarm_backup_audio_curr = bac[0] self.safety_shutdown_active = True if saf[0] == 1 else False self.alarm_table_button_blockers[self.AlarmResponseButtons.HD_ALARM_RESPONSE_BUTTON_RESUME.value] = True if trs[0] == 1 else False self.alarm_table_button_blockers[self.AlarmResponseButtons.HD_ALARM_RESPONSE_BUTTON_RINSEBACK.value] = True if trb[0] else False self.alarm_table_button_blockers[self.AlarmResponseButtons.HD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value] = True if tet[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.HD_ALARM_RESPONSE_BUTTON_RESUME.value] = True if srs[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.HD_ALARM_RESPONSE_BUTTON_RINSEBACK.value] = True if srb[0] else False self.alarm_state_button_blockers[self.AlarmResponseButtons.HD_ALARM_RESPONSE_BUTTON_END_TREATMENT.value] = True if set[0] else False def cmd_clear_all_alarms(self) -> int: """ Constructs and sends the clear all active alarms command. This will clear even non-recoverable alarms. Constraints: Must be logged into HD. @return: 1 if successful, zero otherwise """ key = integer_to_bytearray(-758926171) # 0xD2C3B4A5 payload = key message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_SUPER_CLEAR_ALARMS_CMD.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("All alarms cleared.") # response payload is OK or not OK return 1 == received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] return False def cmd_resend_all_alarms(self) -> int: """ Constructs and sends the re-send all active alarms command. This will allow Dialin to get caught up with HD alarms that were triggered prior to connection. Constraints: Must be logged into HD. @return: 1 if successful, zero otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SEND_ALARMS_COMMAND.value) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("Command to re-send all active HD alarms acknowledged.") # response payload is OK or not OK return 1 == received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] return False def cmd_alarm_state_override(self, alarm: int, state: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm state override command Constraints: Must be logged into HD. Given alarm must be valid. If inactivating alarm, given alarm must be recoverable (clearable). @param alarm: integer - ID of alarm to override @param state: integer - 1 for alarm active, 0 for alarm inactive @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sta = integer_to_bytearray(state) alm = integer_to_bytearray(alarm) payload = rst + sta + alm message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_ALARM_STATE_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") self.logger.debug("Alarm " + str(alarm) + " " + str_res + ": " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_time_override(self, alarm: int, time_ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm time override command Constraints: Must be logged into HD. Given alarm must be valid. @param alarm: integer - ID of alarm to override @param time_ms: integer - time (in ms) since alarm was activated @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) ms = integer_to_bytearray(time_ms) alm = integer_to_bytearray(alarm) payload = rst + ms + alm message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_ALARM_TIME_OVERRIDE.value, payload=payload) self.logger.debug("override alarm time since activated") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = str(time_ms) self.logger.debug("Alarm time since activated overridden to " + str_res + " ms: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_lamp_pattern_override(self, pattern: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm lamp pattern override command. Constraints: Must be logged into HD. Given pattern must be one of the patterns listed below. @param pattern: integer - ID of alarm lamp pattern to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise Patterns: \n 0 = off \n 1 = ok \n 2 = fault \n 3 = high \n 4 = medium \n 5 = low \n 6 = manual """ rst = integer_to_bytearray(reset) pat = integer_to_bytearray(pattern) payload = rst + pat message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_ALARM_LAMP_PATTERN_OVERRIDE.value, payload=payload) self.logger.debug("Override Alarm Lamp Pattern") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug(received_message) if reset == RESET: str_pat = "reset back to normal" elif pattern == self.HD_ALARM_LAMP_PATTERN_OFF: str_pat = "off" elif pattern == self.HD_ALARM_LAMP_PATTERN_OK: str_pat = "ok" elif pattern == self.HD_ALARM_LAMP_PATTERN_FAULT: str_pat = "fault" elif pattern == self.HD_ALARM_LAMP_PATTERN_HIGH: str_pat = "high" elif pattern == self.HD_ALARM_LAMP_PATTERN_MEDIUM: str_pat = "medium" elif pattern == self.HD_ALARM_LAMP_PATTERN_LOW: str_pat = "low" else: str_pat = "manual" self.logger.debug("Alarm lamp pattern overridden to " + str_pat + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_info_broadcast_interval_override(self, ms: int = 1000, reset: int = NO_RESET): """ Constructs and sends the alarm information broadcast interval override command Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_INFO_SEND_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override alarm information broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Alarm information broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_status_broadcast_interval_override(self, ms: int = 250, reset: int = NO_RESET): """ Constructs and sends the alarm status broadcast interval override command Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_STATUS_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override alarm status broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Alarm status broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_volume_override(self, volume: int = 5, reset: int = NO_RESET): """ Constructs and sends the alarm audio volume override command Constraints: Must be logged into HD. Given volume must be an integer between 1 and 5. @param volume: integer - alarm audio volume level (1..5) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vol = integer_to_bytearray(volume) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_AUDIO_VOLUME_LEVEL_OVERRIDE.value, payload=payload) self.logger.debug("override alarm audio volume level") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(volume) + ": " self.logger.debug("Alarm audio volume level overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_current_hg_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the alarm audio current (high gain) override command Constraints: Must be logged into HD. @param current: float - current (in mA) for high gain alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_AUDIO_CURRENT_HG_OVERRIDE.value, payload=payload) self.logger.debug("override alarm audio high gain current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm audio high gain current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_current_lg_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the alarm audio current (low gain) override command Constraints: Must be logged into HD. @param current: float - current (in mA) for low gain alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_AUDIO_CURRENT_LG_OVERRIDE.value, payload=payload) self.logger.debug("override alarm audio high gain current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm audio low gain current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_backup_audio_current_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the backup alarm audio current override command Constraints: Must be logged into HD. @param current: float - current (in mA) for backup alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_BACKUP_AUDIO_CURRENT_OVERRIDE.value, payload=payload) self.logger.debug("override alarm backup audio current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm backup audio current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False