########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Peter Lucia # @date (last) 10-Nov-2020 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ import struct from logging import Logger from .constants import RESET, NO_RESET from ..common.msg_defs import MsgIds from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.base import _AbstractSubSystem, _publish from ..utils.conversions import integer_to_bytearray class HDAlarms(_AbstractSubSystem): """ HD interface containing alarm related commands. """ # Alarm lamp patterns HD_ALARM_LAMP_PATTERN_OFF = 0 HD_ALARM_LAMP_PATTERN_OK = 1 HD_ALARM_LAMP_PATTERN_FAULT = 2 HD_ALARM_LAMP_PATTERN_HIGH = 3 HD_ALARM_LAMP_PATTERN_MEDIUM = 4 HD_ALARM_LAMP_PATTERN_LOW = 5 HD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm states HD_ALARM_STATE_NONE = 0 HD_ALARM_STATE_LOW = 1 HD_ALARM_STATE_MEDIUM = 2 HD_ALARM_STATE_HIGH = 3 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_TOP END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 START_POS_ALARMS_FLAGS = END_POS_ALARM_ESCALATES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_STATUS.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarms_status_sync) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_TRIGGERED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_activate) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = MsgIds.MSG_ID_ALARM_CLEARED.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_clear) # composite alarm status based on latest HD alarm status broadcast message self.alarms_state = 0 self.alarm_top = 0 self.alarms_silence_expires_in = 0 self.alarms_escalates_in = 0 self.alarms_flags = 0 # alarm states based on received HD alarm activation and alarm clear messages self.alarm_states = [False] * 500 def get_alarm_states(self): """ Gets all states for all alarms @return: List of booleans of size 500 """ return self.alarm_states def get_alarms_top(self): """ Gets the top alarm @return: (int) the top alarm """ return self.alarm_top def get_alarms_silence_expires_in(self): """ Gets the remaining time the alarms will be silenced (s) @return: (int) how long until the alarm silence expires """ return self.alarms_silence_expires_in def get_alarms_escalates_in(self): """ Gets the alarms escalates in time (s) @return: (int) how long until the alarm escalates """ return self.alarms_escalates_in def get_alarms_flags(self): """ Gets the alarms flags Extract each flag from the flags int using bit-masking. E.g. System Fault = result & 1 Stop = result & 2 No Clear = result & 4 No Resume = result & 8 No Rinseback = result & 16 No End Treatment = result & 32 No New Treatment = result & 64 Bypass Dialyzer = result & 128 Alarms to Escalate = result & 256 Alarms Silenced = result & 512 TBD = result & 1024 TBD = result & 2048 TBD = result & 4096 TBD = result & 8192 TBD = result & 16384 TBD = result & 32768 TBD = result & 65536 @return: (int) The alarms flags value """ return self.alarms_flags def get_alarm_ids(self): """ Returns a dictionary of the alarm short name and the corresponding id @return: OrderedDict of the alarm ids """ return self.ids @_publish(["alarms_state", "alarm_top", "alarms_silence_expires_in", "alarms_escalates_in", "alarms_flags"]) def _handler_alarms_status_sync(self, message): """ Handles published alarms status messages. alarms status data are captured for reference. @param message: published blood flow data message @return: none """ self.alarms_state = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), byteorder=DenaliMessage.BYTE_ORDER) self.alarm_top = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_silence_expires_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_escalates_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_ESCALATES_IN:self.END_POS_ALARM_ESCALATES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_flags = int.from_bytes(bytearray( message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]), byteorder=DenaliMessage.BYTE_ORDER) # if no active alarms from HD, set all alarms (on Dialin side) to False in case we got out of sync if self.alarm_top == 0: for x in range(500): self.alarm_states[x] = False @_publish(["alarm_states"]) def _handler_alarm_activate(self, message): """ Handles published HD alarm activation messages. @param message: published HD alarm activation message @return: none """ self.logger.debug("Alarm activated!") alarm_id = struct.unpack('