########################################################################### # # Copyright (c) 2019-2021 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Sean Nash # @date (last) 12-Nov-2021 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ import struct from logging import Logger from .constants import RESET, NO_RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish from ..utils.checks import check_broadcast_interval_override_ms from ..utils.conversions import integer_to_bytearray, float_to_bytearray class HDAlarms(AbstractSubSystem): """ HD interface containing alarm related commands. """ # Alarm lamp patterns HD_ALARM_LAMP_PATTERN_OFF = 0 HD_ALARM_LAMP_PATTERN_OK = 1 HD_ALARM_LAMP_PATTERN_FAULT = 2 HD_ALARM_LAMP_PATTERN_HIGH = 3 HD_ALARM_LAMP_PATTERN_MEDIUM = 4 HD_ALARM_LAMP_PATTERN_LOW = 5 HD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm states HD_ALARM_STATE_NONE = 0 HD_ALARM_STATE_LOW = 1 HD_ALARM_STATE_MEDIUM = 2 HD_ALARM_STATE_HIGH = 3 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_TOP END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_ESCALATES_IN END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARMS_FLAGS = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_STATUS.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarms_status_sync) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_activate) msg_id = MsgIds.MSG_ID_ALARM_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_clear) msg_id = MsgIds.MSG_ID_ALARM_CONDITION_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_condition_clear) channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_ALARM_INFORMATION.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_information_sync) # composite alarm status based on latest HD alarm status broadcast message self.alarms_state = 0 self.alarm_top = 0 self.alarms_silence_expires_in = 0 self.alarms_escalates_in = 0 self.alarms_flags = 0 # alarm states based on received HD alarm activation and alarm clear messages self.alarm_states = [False] * 500 # alarm condition states based on received HD alarm activation and clear condition messages self.alarm_conditions = [False] * 500 # alarm information self.alarm_volume = 0 self.alarm_audio_curr_hg = 0.0 self.alarm_audio_curr_lg = 0.0 self.alarm_backup_audio_curr = 0.0 self.safety_shutdown_active = False def get_current_alarms_state(self): """ Gets the current alarms state. None, Low, Medium, or High @return: (str) the current alarms state in text form. """ result = "" if self.alarms_state == self.HD_ALARM_STATE_NONE: result = "None" elif self.alarms_state == self.HD_ALARM_STATE_LOW: result = "Low" elif self.alarms_state == self.HD_ALARM_STATE_MEDIUM: result = "Medium" elif self.alarms_state == self.HD_ALARM_STATE_HIGH: result = "High" return result def get_alarm_states(self): """ Gets all states for all alarms @return: List of booleans of size 500 """ return self.alarm_states def get_alarm_conditions(self): """ Gets all alarm condition states for all alarms @return: List of booleans of size 500 """ return self.alarm_conditions def get_alarm_state(self, alarm_id): """ Gets alarm state for given alarm @return: Alarm state """ return self.alarm_states[alarm_id] def get_alarms_top(self): """ Gets the top alarm @return: (int) the top alarm """ return self.alarm_top def get_alarms_silence_expires_in(self): """ Gets the remaining time the alarms will be silenced (s) @return: (int) how long until the alarm silence expires """ return self.alarms_silence_expires_in def get_alarms_escalates_in(self): """ Gets the alarms escalates in time (s) @return: (int) how long until the alarm escalates """ return self.alarms_escalates_in def get_alarms_flags(self): # TODO - update flags to latest """ Gets the alarms flags Extract each flag from the flags int using bit-masking. E.g. System Fault = result & 1 Stop = result & 2 No Clear = result & 4 No Resume = result & 8 No Rinseback = result & 16 No End Treatment = result & 32 No New Treatment = result & 64 User Must ACK = result & 128 Alarms to Escalate = result & 256 Alarms Silenced = result & 512 Alarm Lamp On = result & 1024 TBD = result & 2048 TBD = result & 4096 TBD = result & 8192 No Minimize = result & 16384 Condition Detected = result & 32768 @return: (int) The alarms flags value """ return self.alarms_flags def get_alarm_volume(self): """ Gets the alarm audio volume level. @return: (int) current alarm audio volume (1..5) """ return self.alarm_volume def get_alarm_audio_current_hg(self): """ Gets the alarm audio current - high gain. @return: (float) alarm audio current - high gain (in mA) """ return self.alarm_audio_curr_hg def get_alarm_audio_current_lg(self): """ Gets the alarm audio current - low gain. @return: (float) alarm audio current - low gain (in mA) """ return self.alarm_audio_curr_lg def get_alarm_backup_audio_current(self): """ Gets the alarm backup audio current. @return: (float) alarm backup audio current (in mA) """ return self.alarm_backup_audio_curr def get_safety_shutdown_activated(self): """ Gets the state of the HD safety shutdown signal. @return: (bool) safety shutdown line is activated (T/F) """ return self.safety_shutdown_active def get_alarm_flag_system_fault(self) -> bool: """ Gets the alarm flag system fault. @return: (bool) Alarm flag system fault (T/F) """ return (self.alarms_flags & 1) > 0 def get_alarm_flag_stop(self) -> bool: """ Gets the alarm flag no clear. @return: (bool) Alarm flag no clear (T/F) """ return (self.alarms_flags & 2) > 0 def get_alarm_flag_no_clear(self) -> bool: """ Gets the alarm flag no clear. @return: (bool) Alarm flag no clear (T/F) """ return (self.alarms_flags & 4) > 0 def get_alarm_flag_no_resume(self) -> bool: """ Gets the alarm flag no resume. @return: (bool) Alarm flag no resume (T/F) """ return (self.alarms_flags & 8) > 0 def get_alarm_flag_no_rinseback(self) -> bool: """ Gets the alarm flag no rinseback. @return: (bool) Alarm flag no rinseback (T/F) """ return (self.alarms_flags & 16) > 0 def get_alarm_flag_no_end_treatment(self) -> bool: """ Gets the alarm flag no end treatment. @return: (bool) Alarm flag no end treatment (T/F) """ return (self.alarms_flags & 32) > 0 def get_alarm_flag_no_new_treatment(self) -> bool: """ Gets the alarm flag no new treatment. @return: (bool) Alarm flag no new treatment (T/F) """ return (self.alarms_flags & 64) > 0 def get_alarm_flag_lamp_on(self) -> bool: """ Gets the alarm flag lamp on. @return: (bool) Alarm lamp on (T/F) """ return (self.alarms_flags & 1024) > 0 def get_alarm_flag_no_minimize(self) -> bool: """ Gets the alarm flag no minimize. @return: (bool) Alarm cannot be minimized (T/F) """ return (self.alarms_flags & 16384) > 0 @publish(["alarms_state", "alarm_top", "alarms_silence_expires_in", "alarms_escalates_in", "alarms_flags"]) def _handler_alarms_status_sync(self, message): """ Handles published alarms status messages. alarms status data are captured for reference. @param message: published blood flow data message @return: none """ self.alarms_state = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), byteorder=DenaliMessage.BYTE_ORDER) self.alarm_top = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_escalates_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_ESCALATES_IN:self.END_POS_ALARM_ESCALATES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_silence_expires_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_flags = int.from_bytes(bytearray( message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]), byteorder=DenaliMessage.BYTE_ORDER) # TODO this clears the alarm state even if the state is not cleared yet. # TODO investigate this code # if no active alarms from HD, set all alarms (on Dialin side) to False in case we got out of sync #if self.alarm_top == 0: # for x in range(500): # self.alarm_states[x] = False @publish(["alarm_states", "alarm_conditions"]) def _handler_alarm_activate(self, message): """ Handles published HD alarm activation messages. @param message: published HD alarm activation message @return: none """ self.logger.debug("Alarm activated!") alarm_id = struct.unpack(' int: """ Constructs and sends the clear all active alarms command. This will clear even non-recoverable alarms. Constraints: Must be logged into HD. @return: 1 if successful, zero otherwise """ key = integer_to_bytearray(-758926171) # 0xD2C3B4A5 payload = key message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_SUPER_CLEAR_ALARMS_CMD.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug("All alarms cleared.") # response payload is OK or not OK return 1 == received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] return False def cmd_alarm_state_override(self, alarm: int, state: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm state override command Constraints: Must be logged into HD. Given alarm must be valid. If inactivating alarm, given alarm must be recoverable (clearable). @param alarm: integer - ID of alarm to override @param state: integer - 1 for alarm active, 0 for alarm inactive @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) sta = integer_to_bytearray(state) alm = integer_to_bytearray(alarm) payload = rst + sta + alm message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_ALARM_STATE_OVERRIDE.value, payload=payload) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = ("active" if state != 0 else "inactive") self.logger.debug("Alarm " + str(alarm) + " " + str_res + ": " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_time_override(self, alarm: int, time_ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm time override command Constraints: Must be logged into HD. Given alarm must be valid. @param alarm: integer - ID of alarm to override @param time_ms: integer - time (in ms) since alarm was activated @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) ms = integer_to_bytearray(time_ms) alm = integer_to_bytearray(alarm) payload = rst + ms + alm message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_ALARM_TIME_OVERRIDE.value, payload=payload) self.logger.debug("override alarm time since activated") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # self.logger.debug(received_message) if reset == RESET: str_res = "reset back to normal" else: str_res = str(time_ms) self.logger.debug("Alarm time since activated overridden to " + str_res + " ms: " + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_lamp_pattern_override(self, pattern: int, reset: int = NO_RESET) -> int: """ Constructs and sends the alarm lamp pattern override command. Constraints: Must be logged into HD. Given pattern must be one of the patterns listed below. @param pattern: integer - ID of alarm lamp pattern to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise Patterns: \n 0 = off \n 1 = ok \n 2 = fault \n 3 = high \n 4 = medium \n 5 = low \n 6 = manual """ rst = integer_to_bytearray(reset) pat = integer_to_bytearray(pattern) payload = rst + pat message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_ALARM_LAMP_PATTERN_OVERRIDE.value, payload=payload) self.logger.debug("Override Alarm Lamp Pattern") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: self.logger.debug(received_message) if reset == RESET: str_pat = "reset back to normal" elif pattern == self.HD_ALARM_LAMP_PATTERN_OFF: str_pat = "off" elif pattern == self.HD_ALARM_LAMP_PATTERN_OK: str_pat = "ok" elif pattern == self.HD_ALARM_LAMP_PATTERN_FAULT: str_pat = "fault" elif pattern == self.HD_ALARM_LAMP_PATTERN_HIGH: str_pat = "high" elif pattern == self.HD_ALARM_LAMP_PATTERN_MEDIUM: str_pat = "medium" elif pattern == self.HD_ALARM_LAMP_PATTERN_LOW: str_pat = "low" else: str_pat = "manual" self.logger.debug("Alarm lamp pattern overridden to " + str_pat + ":" + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_info_broadcast_interval_override(self, ms: int = 1000, reset: int = NO_RESET): """ Constructs and sends the alarm information broadcast interval override command Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_INFO_SEND_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override alarm information broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Alarm information broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_status_broadcast_interval_override(self, ms: int = 250, reset: int = NO_RESET): """ Constructs and sends the alarm status broadcast interval override command Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ if not check_broadcast_interval_override_ms(ms): return False rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_STATUS_PUBLISH_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("override alarm status broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Alarm status broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_volume_override(self, volume: int = 5, reset: int = NO_RESET): """ Constructs and sends the alarm audio volume override command Constraints: Must be logged into HD. Given volume must be an integer between 1 and 5. @param volume: integer - alarm audio volume level (1..5) @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) vol = integer_to_bytearray(volume) payload = rst + vol message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_AUDIO_VOLUME_LEVEL_OVERRIDE.value, payload=payload) self.logger.debug("override alarm audio volume level") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(volume) + ": " self.logger.debug("Alarm audio volume level overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_current_hg_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the alarm audio current (high gain) override command Constraints: Must be logged into HD. @param current: float - current (in mA) for high gain alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_AUDIO_CURRENT_HG_OVERRIDE.value, payload=payload) self.logger.debug("override alarm audio high gain current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm audio high gain current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_audio_current_lg_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the alarm audio current (low gain) override command Constraints: Must be logged into HD. @param current: float - current (in mA) for low gain alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_AUDIO_CURRENT_LG_OVERRIDE.value, payload=payload) self.logger.debug("override alarm audio high gain current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm audio low gain current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_alarm_backup_audio_current_override(self, current: float = 0.0, reset: int = NO_RESET): """ Constructs and sends the backup alarm audio current override command Constraints: Must be logged into HD. @param current: float - current (in mA) for backup alarm audio @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) cur = float_to_bytearray(current) payload = rst + cur message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_BACKUP_AUDIO_CURRENT_OVERRIDE.value, payload=payload) self.logger.debug("override alarm backup audio current") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(current) + " mA: " self.logger.debug("Alarm backup audio current overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False