########################################################################### # # Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file alarms.py # # @author (last) Peter Lucia # @date (last) 20-Jul-2020 # @author (original) Peter Lucia # @date (original) 02-Apr-2020 # ############################################################################ from ..protocols.CAN import (DenaliMessage, DenaliChannels) from ..utils.conversions import integer_to_bytearray from ..utils.base import _AbstractSubSystem, _publish from .constants import RESET, NO_RESET from collections import OrderedDict import struct from logging import Logger from ..common.alarm_defs import AlarmList class HDAlarms(_AbstractSubSystem): """ HD interface containing alarm related commands. """ # alarms message IDs MSG_ID_HD_ALARMS_PUBLISHED_STATUS = 0x0002 MSG_ID_HD_ALARM_ACTIVATE = 0x0003 MSG_ID_HD_ALARM_CLEAR = 0x0004 MSG_ID_HD_ALARM_LAMP_OVERRIDE = 0x8004 MSG_ID_HD_ALARM_STATE_OVERRIDE = 0x8006 MSG_ID_HD_ALARM_TIME_OVERRIDE = 0x8007 # Alarm lamp patterns HD_ALARM_LAMP_PATTERN_OFF = 0 HD_ALARM_LAMP_PATTERN_OK = 1 HD_ALARM_LAMP_PATTERN_FAULT = 2 HD_ALARM_LAMP_PATTERN_HIGH = 3 HD_ALARM_LAMP_PATTERN_MEDIUM = 4 HD_ALARM_LAMP_PATTERN_LOW = 5 HD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm states HD_ALARM_STATE_NONE = 0 HD_ALARM_STATE_LOW = 1 HD_ALARM_STATE_MEDIUM = 2 HD_ALARM_STATE_HIGH = 3 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_TOP END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 START_POS_ALARMS_FLAGS = END_POS_ALARM_ESCALATES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 # See AlarmDefs.h ALARM_ID_NO_ALARM = 0 ALARM_ID_SOFTWARE_FAULT = 1 ALARM_ID_STUCK_BUTTON_TEST_FAILED = 2 ALARM_ID_FPGA_POST_TEST_FAILED = 3 ALARM_ID_WATCHDOG_POST_TEST_FAILED = 4 ALARM_ID_UI_COMM_POST_FAILED = 5 ALARM_ID_BLOOD_PUMP_MC_CURRENT_CHECK = 6 ALARM_ID_BLOOD_PUMP_OFF_CHECK = 7 ALARM_ID_BLOOD_PUMP_MC_DIRECTION_CHECK = 8 ALARM_ID_BLOOD_PUMP_ROTOR_SPEED_CHECK = 9 ALARM_ID_DIAL_IN_PUMP_MC_CURRENT_CHECK = 10 ALARM_ID_DIAL_IN_PUMP_OFF_CHECK = 11 ALARM_ID_DIAL_IN_PUMP_MC_DIRECTION_CHECK = 12 ALARM_ID_DIAL_IN_PUMP_ROTOR_SPEED_CHECK = 13 ALARM_ID_DIAL_OUT_PUMP_MC_CURRENT_CHECK = 14 ALARM_ID_DIAL_OUT_PUMP_OFF_CHECK = 15 ALARM_ID_DIAL_OUT_PUMP_MC_DIRECTION_CHECK = 16 ALARM_ID_DIAL_OUT_PUMP_ROTOR_SPEED_CHECK = 17 ALARM_ID_WATCHDOG_EXPIRED = 18 ALARM_ID_RTC_COMM_ERROR = 19 ALARM_ID_RTC_CONFIG_ERROR = 20 ALARM_ID_DG_COMM_TIMEOUT = 21 ALARM_ID_UI_COMM_TIMEOUT = 22 ALARM_ID_COMM_TOO_MANY_BAD_CRCS = 23 ALARM_ID_TREATMENT_STOPPED_BY_USER = 24 ALARM_ID_BLOOD_SITTING_WARNING = 25 ALARM_ID_BLOOD_SITTING_TOO_LONG_NO_RESUME = 26 ALARM_ID_BLOOD_SITTING_TOO_LONG_NO_RINSEBACK = 27 ALARM_ID_CAN_MESSAGE_NOT_ACKED = 28 ALARM_ID_OCCLUSION_BLOOD_PUMP = 29 ALARM_ID_OCCLUSION_DIAL_IN_PUMP = 30 ALARM_ID_OCCLUSION_DIAL_OUT_PUMP = 31 ALARM_ID_ARTERIAL_PRESSURE_LOW = 32 ALARM_ID_ARTERIAL_PRESSURE_HIGH = 33 ALARM_ID_VENOUS_PRESSURE_LOW = 34 ALARM_ID_VENOUS_PRESSURE_HIGH = 35 ALARM_ID_UF_RATE_TOO_HIGH_ERROR = 36 ALARM_ID_UF_VOLUME_ACCURACY_ERROR = 37 ALARM_ID_RTC_BATTERY_LOW = 38 ALARM_ID_RTC_OR_TIMER_ACCURACY_FAILURE = 39 ALARM_ID_RTC_RAM_OPS_ERROR = 40 ALARM_ID_NVDATA_EEPROM_OPS_FAILURE = 41 ALARM_ID_NVDATA_MFG_RECORD_CRC_ERROR = 42 ALARM_ID_NVDATA_SRVC_RECORD_CRC_ERROR = 43 ALARM_ID_NVDATA_CAL_RECORD_CRC_ERROR = 44 ALARM_ID_NVDATA_HW_USAGE_DATA_CRC_ERROR = 45 AlARM_ID_NVDATA_DISINFECTION_DATE_CRC_ERROR = 46 ALARM_ID_RO_PUMP_OUT_PRESSURE_OUT_OF_RANGE = 47 ALARM_ID_TEMPERATURE_SENSORS_OUT_OF_RANGE = 48 ALARM_ID_TEMPERATURE_SENSORS_INCONSISTENT = 49 ALARM_ID_HD_COMM_TIMEOUT = 50 ALARM_ID_VALVE_CONTROL_FAILURE = 51 ALARM_ID_BLOOD_PUMP_FLOW_VS_MOTOR_SPEED_CHECK = 52 ALARM_ID_DIAL_IN_PUMP_FLOW_VS_MOTOR_SPEED_CHECK = 53 ALARM_ID_DIAL_OUT_PUMP_FLOW_VS_MOTOR_SPEED_CHECK = 54 ALARM_ID_BLOOD_PUMP_MOTOR_SPEED_CHECK = 55 ALARM_ID_DIAL_IN_PUMP_MOTOR_SPEED_CHECK = 56 ALARM_ID_DIAL_OUT_PUMP_MOTOR_SPEED_CHECK = 57 ALARM_ID_BLOOD_PUMP_ROTOR_SPEED_TOO_HIGH = 58 ALARM_ID_INLET_WATER_TEMPERATURE_OUT_OF_RANGE = 59 def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARMS_PUBLISHED_STATUS self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarms_status_sync) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARM_ACTIVATE self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_activate) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARM_CLEAR self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_alarm_clear) # composite alarm status based on latest HD alarm status broadcast message self.alarms_state = 0 self.alarm_top = 0 self.alarms_silence_expires_in = 0 self.alarms_escalates_in = 0 self.alarms_flags = 0 # alarm states based on received HD alarm activation and alarm clear messages self.alarm_states = [False] * 500 self.ids = OrderedDict() for attr in dir(AlarmList): if not callable(getattr(AlarmList, attr)) and attr.startswith("ALARM_ID"): self.ids[attr] = getattr(AlarmList, attr) self.ids = OrderedDict(sorted(self.ids.items(), key=lambda key: key[1].value)) def get_alarm_states(self): """ Gets all states for all alarms @return: List of booleans of size 500 """ return self.alarm_states def get_alarms_top(self): """ Gets the top alarm @return: (int) the top alarm """ return self.alarm_top def get_alarms_silence_expires_in(self): """ Gets the remaining time the alarms will be silenced (s) @return: (int) how long until the alarm silence expires """ return self.alarms_silence_expires_in def get_alarms_escalates_in(self): """ Gets the alarms escalates in time (s) @return: (int) how long until the alarm escalates """ return self.alarms_escalates_in def get_alarms_flags(self): """ Gets the alarms flags Extract each flag from the flags int using bit-masking. E.g. System Fault = result & 1 Stop = result & 2 No Clear = result & 4 No Resume = result & 8 No Rinseback = result & 16 No End Treatment = result & 32 No New Treatment = result & 64 Bypass Dialyzer = result & 128 Alarms to Escalate = result & 256 Alarms Silenced = result & 512 TBD = result & 1024 TBD = result & 2048 TBD = result & 4096 TBD = result & 8192 TBD = result & 16384 TBD = result & 32768 TBD = result & 65536 @return: (int) The alarms flags value """ return self.alarms_flags def get_alarm_ids(self): """ Returns a dictionary of the alarm short name and the corresponding id @return: OrderedDict of the alarm ids """ return self.ids @_publish(["alarms_state", "alarm_top", "alarms_silence_expires_in", "alarms_escalates_in", "alarms_flags"]) def _handler_alarms_status_sync(self, message): """ Handles published alarms status messages. alarms status data are captured for reference. @param message: published blood flow data message @return: none """ self.alarms_state = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), byteorder=DenaliMessage.BYTE_ORDER) self.alarm_top = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_silence_expires_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_escalates_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_ESCALATES_IN:self.END_POS_ALARM_ESCALATES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_flags = int.from_bytes(bytearray( message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]), byteorder=DenaliMessage.BYTE_ORDER) @_publish(["alarm_states"]) def _handler_alarm_activate(self, message): """ Handles published HD alarm activation messages. @param message: published HD alarm activation message @return: none """ alarm_id = struct.unpack('