Index: DG/DialysateGenerator.py =================================================================== diff -u -r0a66cef524a41670eed7b9204163bb9634a2f919 -r082ea4c8b0c6a3c804453308990d0bb98a0a2156 --- DG/DialysateGenerator.py (.../DialysateGenerator.py) (revision 0a66cef524a41670eed7b9204163bb9634a2f919) +++ DG/DialysateGenerator.py (.../DialysateGenerator.py) (revision 082ea4c8b0c6a3c804453308990d0bb98a0a2156) @@ -13,6 +13,7 @@ # @brief This class allows sending to and receiving from the DG device. # ############################################################################ +# TODO: Needs to be restructured and existing TODO items need to be addressed. from DialIn.CoreCANProtocol import (DenaliCanMessenger, DenaliMessage, Index: HD/Alarms.py =================================================================== diff -u --- HD/Alarms.py (revision 0) +++ HD/Alarms.py (revision 082ea4c8b0c6a3c804453308990d0bb98a0a2156) @@ -0,0 +1,263 @@ +from DialIn.CoreCANProtocol import (DenaliMessage, + DenaliChannels) +import struct +from .utils import integer2ByteArray +from .HemodialysisDevice import HD + + +class HDAlarms: + """ + \class HD_Alarms + + \brief Hemodialysis Device (HD) Dialin API sub-class for alarm related commands. + """ + + # Alarms message IDs + MSG_ID_HD_ALARMS_PUBLISHED_STATUS = 0x0002 + MSG_ID_HD_ALARM_ACTIVATE = 0x0003 + MSG_ID_HD_ALARM_CLEAR = 0x0004 + MSG_ID_HD_ALARM_LAMP_OVERRIDE = 0x8004 + MSG_ID_HD_ALARM_STATE_OVERRIDE = 0x8006 + MSG_ID_HD_ALARM_TIME_OVERRIDE = 0x8007 + + # Alarm lamp patterns + HD_ALARM_LAMP_PATTERN_OFF = 0 + HD_ALARM_LAMP_PATTERN_OK = 1 + HD_ALARM_LAMP_PATTERN_FAULT = 2 + HD_ALARM_LAMP_PATTERN_HIGH = 3 + HD_ALARM_LAMP_PATTERN_MEDIUM = 4 + HD_ALARM_LAMP_PATTERN_LOW = 5 + HD_ALARM_LAMP_PATTERN_MANUAL = 6 + + # Alarm status message field positions + START_POS_ALARM_STATE = DenaliMessage.PAYLOAD_START_INDEX + END_POS_ALARM_STATE = START_POS_ALARM_STATE + 4 + START_POS_ALARM_TOP = END_POS_ALARM_STATE + END_POS_ALARM_TOP = START_POS_ALARM_TOP + 4 + START_POS_ALARM_SILENCE_EXPIRES_IN = END_POS_ALARM_TOP + END_POS_ALARM_SILENCE_EXPIRES_IN = START_POS_ALARM_SILENCE_EXPIRES_IN + 4 + START_POS_ALARM_ESCALATES_IN = END_POS_ALARM_SILENCE_EXPIRES_IN + END_POS_ALARM_ESCALATES_IN = START_POS_ALARM_ESCALATES_IN + 4 + START_POS_ALARMS_FLAGS = END_POS_ALARM_ESCALATES_IN + END_POS_ALARMS_FLAGS = START_POS_ALARMS_FLAGS + 2 + + START_POS_ALARM_ID = DenaliMessage.PAYLOAD_START_INDEX + END_POS_ALARM_ID = START_POS_ALARM_ID + 2 + + def __init__(self, can_interface): + """ + HD_Alarms constructor + + \param outer_instance: reference to the HD (outer) class. + + \returns HD_Alarms object. + """ + self.can_interface = can_interface + + if self.can_interface is not None: + channel_id = DenaliChannels.hd_alarm_broadcast_ch_id + msg_id = self.MSG_ID_HD_ALARMS_PUBLISHED_STATUS + self.can_interface.register_receiving_publication_function(channel_id, msg_id, + self.handler_alarms_status_sync) + + channel_id = DenaliChannels.hd_alarm_broadcast_ch_id + msg_id = self.MSG_ID_HD_ALARM_ACTIVATE + self.can_interface.register_receiving_publication_function(channel_id, msg_id, + self.handler_alarm_activate) + channel_id = DenaliChannels.hd_alarm_broadcast_ch_id + msg_id = self.MSG_ID_HD_ALARM_CLEAR + self.can_interface.register_receiving_publication_function(channel_id, msg_id, + self.handler_alarm_clear) + + # composite alarm status based on latest HD alarm status broadcast message + self.alarmsState = 0 + self.alarmTop = 0 + self.alarmsSilenceExpiresIn = 0 + self.alarmsEscalatesIn = 0 + self.alarmsFlags = 0 + + # alarm states based on received HD alarm activation and alarm clear messages + self.alarmStates = [False] * 50 + + + def handler_alarms_status_sync(self, message): + """ + Handles published alarms status messages. Alarms status data are captured + for reference. + + \param message: published blood flow data message + \returns none + """ + + self.alarmsState = int.from_bytes(bytearray( + message['message'][self.START_POS_ALARM_STATE:self.END_POS_ALARM_STATE]), + byteorder=DenaliMessage.BYTE_ORDER) + self.alarmTop = int.from_bytes(bytearray( + message['message'][self.START_POS_ALARM_TOP:self.END_POS_ALARM_TOP]), + byteorder=DenaliMessage.BYTE_ORDER) + self.alarmsSilenceExpiresIn = int.from_bytes(bytearray( + message['message'][self.START_POS_ALARM_SILENCE_EXPIRES_IN:self.END_POS_ALARM_SILENCE_EXPIRES_IN]), + byteorder=DenaliMessage.BYTE_ORDER) + self.alarmsEscalatesIn = int.from_bytes(bytearray( + message['message'][self.START_POS_ALARM_ESCALATES_IN:self.END_POS_ALARM_ESCALATES_IN]), + byteorder=DenaliMessage.BYTE_ORDER) + self.alarmsFlags = int.from_bytes(bytearray( + message['message'][self.START_POS_ALARMS_FLAGS:self.END_POS_ALARMS_FLAGS]), + byteorder=DenaliMessage.BYTE_ORDER) + + def handler_alarm_activate(self, message): + """ + Handles published HD alarm activation messages. + + \param message: published HD alarm activation message + \returns none + """ + + alarmID = struct.unpack('