########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_simulator_alarms.py # # @author (last) Micahel Garthwaite # @date (last) 17-Aug-2023 # @author (original) Peter Lucia # @date (original) 28-Sep-2020 # ############################################################################ import struct from time import sleep, time from ..utils import YES, NO from ..protocols.CAN import DenaliMessage, DenaliCanMessenger, DenaliChannels from logging import Logger from ..utils.base import AbstractSubSystem from ..utils.conversions import * from ..common.msg_defs import MsgIds, MsgFieldPositions from ..common.alarm_defs import AlarmList from dialin.common.prs_defs import AlarmDataTypes HIGH = 3 MED = 2 LOW = 1 NONE = 0 HIGH_PRIORITY_COLOR = "#c53b33" MED_LOW_PRIORITY_COLOR = "#f5a623" class Alarms: # TODO: this should be generated from FW # ALARM_ID = (priority, alarmID, escalates in, silent_espires_in, flags) ALARM_ID_NO_ALARM = (NONE, 0, 0, 0, 0) ALARM_ID_SOFTWARE_FAULT = (HIGH, 1, 0, 0, 0) ALARM_ID_STUCK_BUTTON_TEST_FAILED = (HIGH, 2, 0, 0, 0) ALARM_ID_FPGA_POST_TEST_FAILED = (HIGH, 3, 0, 0, 0) ALARM_ID_WATCHDOG_POST_TEST_FAILED = (HIGH, 4, 0, 0, 0) ALARM_ID_UI_COMM_POST_FAILED = (HIGH, 5, 0, 0, 0) ALARM_ID_BLOOD_PUMP_MC_CURRENT_CHECK = (MED, 6, 0, 0, 0) ALARM_ID_BLOOD_PUMP_OFF_CHECK = (MED, 7, 0, 0, 0) ALARM_ID_BLOOD_PUMP_MC_DIRECTION_CHECK = (MED, 8, 0, 0, 0) ALARM_ID_BLOOD_PUMP_ROTOR_SPEED_CHECK = (HIGH, 9, 0, 0, 0) ALARM_ID_DIAL_IN_PUMP_MC_CURRENT_CHECK = (MED, 10, 0, 0, 0) ALARM_ID_DIAL_IN_PUMP_OFF_CHECK = (MED, 11, 0, 0, 0) ALARM_ID_DIAL_IN_PUMP_MC_DIRECTION_CHECK = (MED, 12, 0, 0, 0) ALARM_ID_DIAL_IN_PUMP_ROTOR_SPEED_CHECK = (HIGH, 13, 0, 0, 0) ALARM_ID_DIAL_OUT_PUMP_MC_CURRENT_CHECK = (MED, 14, 0, 0, 0) ALARM_ID_DIAL_OUT_PUMP_OFF_CHECK = (MED, 15, 0, 0, 0) ALARM_ID_DIAL_OUT_PUMP_MC_DIRECTION_CHECK = (MED, 16, 0, 0, 0) ALARM_ID_DIAL_OUT_PUMP_ROTOR_SPEED_CHECK = (HIGH, 17, 0, 0, 0) ALARM_ID_WATCHDOG_EXPIRED = (HIGH, 18, 0, 0, 0) ALARM_ID_RTC_COMM_ERROR = (HIGH, 19, 0, 0, 0) ALARM_ID_RTC_CONFIG_ERROR = (HIGH, 20, 0, 0, 0) ALARM_ID_DG_COMM_TIMEOUT = (HIGH, 21, 0, 0, 0) ALARM_ID_UI_COMM_TIMEOUT = (HIGH, 22, 0, 0, 0) ALARM_ID_COMM_TOO_MANY_BAD_CRCS = (HIGH, 23, 0, 0, 0) ALARM_ID_TREATMENT_STOPPED_BY_USER = (LOW, 24, 0, 0, 0) ALARM_ID_BLOOD_SITTING_WARNING = (MED, 25, 0, 0, 0) ALARM_ID_BLOOD_SITTING_TOO_LONG_NO_RESUME = (MED, 26, 0, 0, 0) ALARM_ID_BLOOD_SITTING_TOO_LONG_NO_RINSEBACK = (HIGH, 27, 0, 0, 0) ALARM_ID_CAN_MESSAGE_NOT_ACKED = (HIGH, 28, 0, 0, 0) ALARM_ID_OCCLUSION_BLOOD_PUMP = (HIGH, 29, 0, 0, 0) ALARM_ID_OCCLUSION_DIAL_IN_PUMP = (HIGH, 30, 0, 0, 0) ALARM_ID_OCCLUSION_DIAL_OUT_PUMP = (HIGH, 31, 0, 0, 0) ALARM_ID_ARTERIAL_PRESSURE_LOW = (HIGH, 32, 0, 0, 0) ALARM_ID_ARTERIAL_PRESSURE_HIGH = (HIGH, 33, 0, 0, 0) ALARM_ID_VENOUS_PRESSURE_LOW = (HIGH, 34, 0, 0, 0) ALARM_ID_VENOUS_PRESSURE_HIGH = (HIGH, 35, 0, 0, 0) ALARM_ID_UF_RATE_TOO_HIGH_ERROR = (HIGH, 36, 0, 0, 0) ALARM_ID_UF_VOLUME_ACCURACY_ERROR = (HIGH, 37, 0, 0, 0) ALARM_ID_RTC_BATTERY_LOW = (HIGH, 38, 0, 0, 0) ALARM_ID_RTC_OR_TIMER_ACCURACY_FAILURE = (HIGH, 39, 0, 0, 0) ALARM_ID_RTC_RAM_OPS_ERROR = (HIGH, 40, 0, 0, 0) ALARM_ID_NVDATA_EEPROM_OPS_FAILURE = (HIGH, 41, 0, 0, 0) ALARM_ID_NVDATA_MFG_RECORD_CRC_ERROR = (HIGH, 42, 0, 0, 0) ALARM_ID_NVDATA_SRVC_RECORD_CRC_ERROR = (HIGH, 43, 0, 0, 0) ALARM_ID_NVDATA_CAL_RECORD_CRC_ERROR = (HIGH, 44, 0, 0, 0) ALARM_ID_NVDATA_HW_USAGE_DATA_CRC_ERROR = (HIGH, 45, 0, 0, 0) ALARM_ID_NVDATA_DISINFECTION_DATE_CRC_ERROR = (HIGH, 46, 0, 0, 0) ALARM_ID_RO_PUMP_OUT_PRESSURE_OUT_OF_RANGE = (HIGH, 47, 0, 0, 0) ALARM_ID_TEMPERATURE_SENSORS_OUT_OF_RANGE = (HIGH, 48, 0, 0, 0) ALARM_ID_TEMPERATURE_SENSORS_INCONSISTENT = (HIGH, 49, 0, 0, 0) ALARM_ID_HD_COMM_TIMEOUT = (HIGH, 50, 0, 0, 0) ALARM_ID_VALVE_CONTROL_FAILURE = (HIGH, 51, 0, 0, 0) ALARM_ID_BLOOD_PUMP_FLOW_VS_MOTOR_SPEED_CHECK = (HIGH, 52, 0, 0, 0) ALARM_ID_DIAL_IN_PUMP_FLOW_VS_MOTOR_SPEED_CHECK = (HIGH, 53, 0, 0, 0) ALARM_ID_DIAL_OUT_PUMP_FLOW_VS_MOTOR_SPEED_CHECK = (HIGH, 54, 0, 0, 0) ALARM_ID_BLOOD_PUMP_MOTOR_SPEED_CHECK = (HIGH, 55, 0, 0, 0) ALARM_ID_DIAL_IN_PUMP_MOTOR_SPEED_CHECK = (HIGH, 56, 0, 0, 0) ALARM_ID_DIAL_OUT_PUMP_MOTOR_SPEED_CHECK = (HIGH, 57, 0, 0, 0) ALARM_ID_BLOOD_PUMP_ROTOR_SPEED_TOO_HIGH = (HIGH, 58, 0, 0, 0) ALARM_ID_INLET_WATER_TEMPERATURE_OUT_OF_RANGE = (HIGH, 59, 0, 0, 0) ALARM_ID_DOES_NOT_EXIST = (HIGH, 99, 0, 0, 0) class HDAlarmsSimulator(AbstractSubSystem): instance_count = 0 def __init__(self, can_interface: DenaliCanMessenger, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() HDAlarmsSimulator.instance_count = HDAlarmsSimulator.instance_count + 1 self.can_interface = can_interface self.logger = logger self.flags = 0 self.clear_after_user_action = False self.current_alarm_volume = 5 if self.can_interface is not None: channel_id = DenaliChannels.ui_to_hd_ch_id self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_USER_ALARM_SILENCE_REQUEST.value, self._handler_alarm_silence) self.can_interface.register_receiving_publication_function(channel_id, MsgIds.MSG_ID_UI_ALARM_USER_ACTION_REQUEST.value, self._handler_user_action) self.can_interface.register_receiving_publication_function(DenaliChannels.ui_to_hd_ch_id, MsgIds.MSG_ID_UI_SET_ALARM_AUDIO_VOLUME_LEVEL_CMD_REQUEST.value, self._handler_request_override_alarm_volume) def _send_alarm_volume_broadcast(self): """ Sends the alarm volume broadcast message @return: None """ payload = integer_to_bytearray(5) # alarm volume payload += float_to_bytearray(1.0) # alarm audio current high gain (mA) payload += float_to_bytearray(1.0) # alarm audio current low gain (mA) payload += float_to_bytearray(1.0) # alarm backup audio current (mA) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_INFORMATION_DATA.value, payload=payload) self.can_interface.send(message, 0) def _handler_request_override_alarm_volume(self, message: dict) -> None: """ Handler for a UI request to override the alarm volume level @param message: @return: """ vol = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.START_POS_FIELD_1 + 4]))[0] self.logger.debug("Received request to override alarm volume to {0}".format(vol)) if 1 <= vol <= 5: self.current_alarm_volume = vol payload = integer_to_bytearray(YES) payload += integer_to_bytearray(0) else: payload = integer_to_bytearray(NO) payload += integer_to_bytearray(1) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_sync_broadcast_ch_id, message_id=MsgIds.MSG_ID_HD_ALARM_AUDIO_VOLUME_SET_RESPONSE.value, payload=payload) self.can_interface.send(message, 0) def cmd_activate_alarm_id(self, state: int = HIGH, alarm: int = 0, escalates_in: int = 0, silence_expires: int = 0, flags: int = 0): """ Activates the specified alarm @param state: (int) Alarm priority @param alarm: (int) the alarm id @param escalates_in: (int) how long until the alarm escalates @param silence_expires: (int) seconds until silence expires @param flags: (int) See 'cmd_make_alarm_flags' @return: None """ state = integer_to_bytearray(state) top = integer_to_bytearray(alarm) escalates_in = integer_to_bytearray(escalates_in) silence_expires = integer_to_bytearray(silence_expires) flags = integer_to_bytearray(flags) payload = state + top + escalates_in + silence_expires + flags message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_alarm_broadcast_ch_id, message_id=MsgIds.MSG_ID_ALARM_STATUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_activate_alarm(self, alarm: AlarmList, state: int = HIGH, escalates_in: int = 0, silence_expires: int = 0, flags: int = 0): """ Activates the specified alarm @param alarm: the alarm enum @param state: Alarm priority @param escalates_in: how long until the alarm escalates @param silence_expires: seconds until silence expires @param flags: additional alarm flags Alarm flags: eFlag_systemFault = 0 eFlag_stop = 1 eFlag_noClear = 2 eFlag_noResume = 3 eFlag_noRinseback = 4 eFlag_noEndTreatment = 5 eFlag_noNewTreatment = 6 eFlag_bypassDialyzer = 7 eFlag_alarmsToEscalate = 8 eFlag_alarmsSilenced = 9 eFlag_userAcknowledged = 10 ... unused = 11 - 16 @return: None """ state = integer_to_bytearray(state) top = integer_to_bytearray(alarm.value) escalates_in = integer_to_bytearray(escalates_in) silence_expires = integer_to_bytearray(silence_expires) flags = integer_to_bytearray(flags) payload = state + top + escalates_in + silence_expires + flags message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_alarm_broadcast_ch_id, message_id=MsgIds.MSG_ID_ALARM_STATUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_make_alarm_flags(self, system_fault=0, stop=0, no_clear=0, no_resume=0, no_rinseback=0, no_end_treatment=0, no_new_treatment=0, user_must_ack=0, alarms_to_escalate=0, alarms_silenced=0, lamp_on=0, unused_1=0, unused_2=0, unused_3=0, no_minimize=0, top_condition=0 ): """ Helper function to construct the flags @param system_fault: One or more system faults has been triggered @param stop: Alarm(s) have stopped treatment/activity and placed system in a safe state @param no_clear: One or more active alarms is not recoverable @param no_resume: The "resume" user recovery option is disabled @param no_rinseback: The "rinseback" user recovery option is disabled @param no_end_treatment: The "end treatment" user recovery option is disabled @param no_new_treatment: A new treatment may not be started without cycling power to system @param user_must_ack: The "ok" user recovery option is enabled @param alarms_to_escalate: One or more active alarms will escalate in time @param alarms_silenced: Alarms have been temporarily silenced by user @param lamp_on: Alarm lamp is currently on (for syncing to UI) @param unused_1: unused @param unused_2: unused @param unused_3: unused @param no_minimize: Prevent user from minimizing alarm window @param top_condition: The top alarm's condition is still being detected @return: (int) containing all the flags """ flags = 0 flags ^= system_fault * 2 ** 0 \ | stop * 2 ** 1 \ | no_clear * 2 ** 2 \ | no_resume * 2 ** 3 \ | no_rinseback * 2 ** 4 \ | no_end_treatment * 2 ** 5 \ | no_new_treatment * 2 ** 6 \ | user_must_ack * 2 ** 7 \ | alarms_to_escalate * 2 ** 8 \ | alarms_silenced * 2 ** 9 \ | lamp_on * 2 ** 10 \ | unused_1 * 2 ** 11 \ | unused_2 * 2 ** 12 \ | unused_3 * 2 ** 13 \ | no_minimize * 2 ** 14 \ | top_condition * 2 ** 15 return flags def cmd_send_clear_alarms(self): """ Broadcasts a clear alarms message @return: None """ state = integer_to_bytearray(0) top = integer_to_bytearray(0) escalates_in = integer_to_bytearray(0) silence_expires = integer_to_bytearray(0) flags = integer_to_bytearray(0) payload = state + top + escalates_in + silence_expires + flags message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_alarm_broadcast_ch_id, message_id=MsgIds.MSG_ID_ALARM_STATUS_DATA.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_alarm_cleared(self, alarm_id: int = 0): """ Broadcasts to clear a specific alarm ID the Alarm Cleared message builder method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x0400| 0x001 | 1 | Event | Y | HD | All | Alarm Cleared | \ref Data::mAlarmID | @return: None """ payload = integer_to_bytearray(alarm_id) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_alarm_broadcast_ch_id, message_id=MsgIds.MSG_ID_ALARM_CLEARED.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_alarm_cleared_condition(self, alarm_id: int = 0): """ Broadcasts that the alarm condition has been cleared the Alarm Cleared message builder method | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | |:----:|:---------:|:---:|:------:|:---:|:---:|:---:|:---------------------: |:--: | |0x3F00| 0x001,2,4 | 1 | Event | Y | HD | All | Alarm Condition Cleared | \ref Data::mAlarmID | @return: None """ payload = integer_to_bytearray(alarm_id) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_alarm_broadcast_ch_id, message_id=MsgIds.MSG_ID_ALARM_CONDITION_CLEARED.value, payload=payload) self.can_interface.send(message, 0) def cmd_set_alarm_triggered(self, alarm_id, field_descriptor_1: int, data_field_1: str, field_descriptor_2: int, data_field_2: str, priority: int, rank: int, clear_top: int) -> None: """ Triggers an alarm. | MSG | CAN ID | Box | Type | Ack | Src | Dst | Description | #1:(U32) | |:----:|:------:|:---:|:------:|:---:|:---:|:---:|:-----------: |:--: | |0x0300| 0x001 | 1 | Event | Y | HD | All | Alarm Triggered | \ref Data::mAlarmID | @param alarm_id: (int) the alarm id to trigger @param field_descriptor_1: (int) alarm data 1 type @param data_field_1: (str) alarm data 1 @param field_descriptor_2: (int) alarm data 2 type @param data_field_2: (str) alarm data 2 @param priority: (int) alarm priority @param rank: (int) alarm rank @param clear_top: (int) clear top only @return: None """ zero = integer_to_bytearray(0) payload = integer_to_bytearray(alarm_id) if field_descriptor_1 == AlarmDataTypes.ALARM_DATA_TYPE_NONE: payload += zero payload += zero payload += zero payload += zero else: if field_descriptor_1 == AlarmDataTypes.ALARM_DATA_TYPE_F32: payload += integer_to_bytearray(field_descriptor_1) payload += float_to_bytearray(float(data_field_1)) else: # BOOL, S32, U32 payload += integer_to_bytearray(field_descriptor_1) payload += integer_to_bytearray(int(data_field_1)) if field_descriptor_2 == AlarmDataTypes.ALARM_DATA_TYPE_NONE: payload += zero payload += zero else: if field_descriptor_2 == AlarmDataTypes.ALARM_DATA_TYPE_F32: payload += integer_to_bytearray(field_descriptor_2) payload += float_to_bytearray(float(data_field_2)) else: # BOOL, S32, U32 payload += integer_to_bytearray(field_descriptor_2) payload += integer_to_bytearray(int(data_field_2)) payload += unsigned_integer_to_bytearray(priority) payload += unsigned_integer_to_bytearray(rank) payload += unsigned_integer_to_bytearray(clear_top) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_alarm_broadcast_ch_id, message_id=MsgIds.MSG_ID_ALARM_TRIGGERED.value, payload=payload) self.can_interface.send(message, 0) def cmd_repeat_broadcast_alarm(self, freq: int = 4, timeout: float = float('inf'), **kwargs): """ Broadcast the specified alarm message at particular frequency @param freq: cycles / s of the broadcast @param timeout: How long to broadcast the alarm for @param kwargs: arguments to pass to cmd_activate_alarm @return: None """ start = time() current = time() counter = 0 silence_expires = 60 while current - start < timeout: self.cmd_activate_alarm(flags=self.flags, silence_expires=silence_expires, **kwargs) sleep(1.0 / freq) current = time() counter += 1 if silence_expires > 0: silence_expires -= 1 else: self.flags = self.cmd_make_alarm_flags(alarms_silenced=0) def set_flags(self, flags): """ Sets the alarm flags @param flags: @return: None """ self.flags = flags def _handler_alarm_acknowledge(self) -> None: """ TODO: Remove Handles the alarm acknowledge message @return: None """ self.logger.debug("Alarm acknowledged") self.flags = self.flags | 2 ** 10 def _handler_alarm_silence(self, message): """ Handles the alarm silence message @param message: the message with 0 = cancel, 1 = silence @return: None """ request = message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1][0] if request: self.logger.debug("Alarm Silence Request Start {0}".format(request)) self.flags = self.flags | 2 ** 9 else: self.logger.debug("Alarm Silence Request Cancel {0}".format(request)) self.flags = self.flags & ~2 ** 9 def cmd_alarm_condition_cleared(self, alarm_id: int): """ Sends the alarm condition cleared message @param alarm_id: (int) The alarm ID @return: None """ payload = integer_to_bytearray(alarm_id) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_alarm_broadcast_ch_id, message_id=MsgIds.MSG_ID_ALARM_CONDITION_CLEARED.value, payload=payload) self.can_interface.send(message, 0) def _handler_user_action(self, message): """ Called when the user responds to an alarm @return: None """ response = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.logger.debug("User response to alarm: {0}".format(response)) if self.clear_after_user_action: self.cmd_send_clear_alarms() def cmd_send_active_list_response(self, accept: bool, reason: int = 0, a0: int = 0, a1: int = 0, a2: int = 0, a3: int = 0, a4: int = 0, a5: int = 0, a6: int = 0, a7: int = 0, a8: int = 0, a9: int = 0) -> None: """ send the list of active alarms @param accept: boolean value true if the request accepted @param reason: the rejection reason @param a0: alarm id 0 in the list - First @param a1: alarm id 1 in the list @param a2: alarm id 2 in the list @param a3: alarm id 3 in the list @param a4: alarm id 4 in the list @param a5: alarm id 5 in the list @param a6: alarm id 6 in the list @param a7: alarm id 7 in the list @param a8: alarm id 8 in the list @param a9: alarm id 9 in the list - Last @return: None """ payload = integer_to_bytearray(accept) payload += integer_to_bytearray(reason) payload += integer_to_bytearray(a0) payload += integer_to_bytearray(a1) payload += integer_to_bytearray(a2) payload += integer_to_bytearray(a3) payload += integer_to_bytearray(a4) payload += integer_to_bytearray(a5) payload += integer_to_bytearray(a6) payload += integer_to_bytearray(a7) payload += integer_to_bytearray(a8) payload += integer_to_bytearray(a9) message = DenaliMessage.build_message(channel_id=DenaliChannels.hd_to_ui_ch_id, message_id=MsgIds.MSG_ID_HD_ACTIVE_ALARMS_LIST_REQUEST_RESPONSE.value, payload=payload) self.can_interface.send(message, 0)