from DialIn.CoreCANProtocol import (DenaliMessage, DenaliChannels) import struct from utils import integer_to_byte_array, RESET class HDAlarms: """ \class HD_Alarms \brief Hemodialysis Device (HD) Dialin API sub-class for alarm related commands. """ # alarms message IDs MSG_ID_HD_ALARMS_PUBLISHED_STATUS = 0x0002 MSG_ID_HD_ALARM_ACTIVATE = 0x0003 MSG_ID_HD_ALARM_CLEAR = 0x0004 MSG_ID_HD_ALARM_LAMP_OVERRIDE = 0x8004 MSG_ID_HD_ALARM_STATE_OVERRIDE = 0x8006 MSG_ID_HD_ALARM_TIME_OVERRIDE = 0x8007 # Alarm lamp patterns HD_ALARM_LAMP_PATTERN_OFF = 0 HD_ALARM_LAMP_PATTERN_OK = 1 HD_ALARM_LAMP_PATTERN_FAULT = 2 HD_ALARM_LAMP_PATTERN_HIGH = 3 HD_ALARM_LAMP_PATTERN_MEDIUM = 4 HD_ALARM_LAMP_PATTERN_LOW = 5 HD_ALARM_LAMP_PATTERN_MANUAL = 6 # Alarm status message field positions START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 START_POS_ALARM_TOP = END_POS_ALARM_STATE END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_TOP END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_SILENCE_EXPIRES_IN END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 START_POS_ALARMS_FLAGS = END_POS_ALARM_ESCALATES_IN END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX END_POS_ALARM_ID = START_POS_ALARM_ID + 2 def __init__(self, can_interface): """ HD_Alarms constructor \param outer_instance: reference to the HD (outer) class. \returns HD_Alarms object. """ self.can_interface = can_interface if self.can_interface is not None: channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARMS_PUBLISHED_STATUS self.can_interface.register_receiving_publication_function(channel_id, msg_id, self.handler_alarms_status_sync) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARM_ACTIVATE self.can_interface.register_receiving_publication_function(channel_id, msg_id, self.handler_alarm_activate) channel_id = DenaliChannels.hd_alarm_broadcast_ch_id msg_id = self.MSG_ID_HD_ALARM_CLEAR self.can_interface.register_receiving_publication_function(channel_id, msg_id, self.handler_alarm_clear) # composite alarm status based on latest HD alarm status broadcast message self.alarms_state = 0 self.alarm_top = 0 self.alarms_silence_expires_in = 0 self.alarms_escalates_in = 0 self.alarms_flags = 0 # alarm states based on received HD alarm activation and alarm clear messages self.alarm_states = [False] * 50 def handler_alarms_status_sync(self, message): """ Handles published alarms status messages. alarms status data are captured for reference. \param message: published blood flow data message \returns none """ self.alarms_state = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), byteorder=DenaliMessage.BYTE_ORDER) self.alarm_top = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_silence_expires_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_escalates_in = int.from_bytes(bytearray( message['message'][self.START_POS_ALARM_ESCALATES_IN:self.END_POS_ALARM_ESCALATES_IN]), byteorder=DenaliMessage.BYTE_ORDER) self.alarms_flags = int.from_bytes(bytearray( message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]), byteorder=DenaliMessage.BYTE_ORDER) def handler_alarm_activate(self, message): """ Handles published HD alarm activation messages. \param message: published HD alarm activation message \returns none """ alarm_id = struct.unpack('