import struct from logging import Logger from enum import unique from .constants import RESET, NO_RESET from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.dd_defs import DDBloodLeakStates from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish, DialinEnum from leahi_dialin.utils.conversions import integer_to_bytearray, bytearray_to_byte, bytearray_to_integer, \ unsigned_short_to_bytearray, byte_to_bytearray, float_to_bytearray @unique class EmbModeCommands(DialinEnum): NU = 0 # NULL command CS = 1 # Control S command (this is handled automatically by using the embedded mode command) SP = 2 # Set set point command T = 3 # Self test command G = 4 # Get self-test drive I = 5 # Display intensity V = 6 # Display blood detection level Z = 7 # Zero command Q = 8 # Zero confirm command D = 9 # Display set point C = 10 # Calibration command class DDBloodLeak(AbstractSubSystem): """ DDBloodLeak Dialysate Delivery (DD) Dialin API sub-class for blood leak related commands. """ # Blood leak detector status BLOOD_LEAK_DETECTED = 0 # Blood detected NO_BLOOD_LEAK_DETECTED = 1 # No blood detected def __init__(self, can_interface, logger: Logger): """ @param can_interface: Leahi Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_blood_leak_sync) channel_id = DenaliChannels.dd_to_dialin_ch_id msg_id = MsgIds.MSG_ID_DD_SEND_BLOOD_LEAK_EMB_MODE_RESPONSE.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_blood_leak_emb_mode_cmd_resp) self.hd_blood_leak_status_timestamp = 0.0 self.hd_blood_leak_emb_mode_response_timestamp = 0.0 self.blood_leak_status = self.NO_BLOOD_LEAK_DETECTED self.blood_leak_state = DDBloodLeakStates.BLOOD_LEAK_INIT_STATE.value self.blood_leak_emb_mode_cmds = dict() self.blood_leak_error_persistent_ctr = 0 self.blood_leak_serial_comm_state = 0 self.blood_leak_intensity = 0 self.blood_leak_blood_detect = 0 self.blood_leak_intensity_moving_average = 0.0 self.blood_leak_in_range_drift_status = 0 self.blood_leak_upper_range_drift_status = 0 self.blood_leak_time_elapsed_since_last_drift_zero_ms = 0 for cmd in EmbModeCommands.__members__: # Initialize all the embedded mode commands self.blood_leak_emb_mode_cmds[cmd] = '' def get_blood_leak_status(self): """ Gets the current blood leak status @return: List containing blood leak status: [detected, undetected] """ return self.blood_leak_status def get_blood_leak_state(self): """ Gets the current blood leak state (0: wait for POST, 1: check set point, 2: init, 3: zero and self test, 4: normal, 5: recover blood detect state ) @return: integer - blood leak state """ return self.blood_leak_state def get_blood_leak_emb_mode_command_response(self, emb_mod_cmd: int) -> str: """ Gets the most recent embedded mode command response for a given command @param emb_mod_cmd the command to get its response @return: string - embedded mode command response """ if emb_mod_cmd < len(EmbModeCommands): return self.blood_leak_emb_mode_cmds[EmbModeCommands(emb_mod_cmd).name] else: self.logger.debug("Invalid command!") @publish(['hd_blood_leak_status_timestamp', 'blood_leak_status', 'blood_leak_state', 'blood_leak_error_persistent_ctr', 'blood_leak_serial_comm_state', 'blood_leak_intensity', 'blood_leak_blood_detect', 'blood_leak_intensity_moving_average', 'blood_leak_time_since_last_zero_ms', 'blood_leak_in_range_drift_status', 'blood_leak_upper_range_drift_status']) def _handler_blood_leak_sync(self, message, timestamp=0.0): """ Handles published blood leak status messages. Blood leak status is captured for reference. @param message: published blood leak status message @return: None """ self.blood_leak_status = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.blood_leak_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.blood_leak_error_persistent_ctr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.blood_leak_serial_comm_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.blood_leak_intensity = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.blood_leak_blood_detect = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6]))[0] self.blood_leak_intensity_moving_average = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7]))[0] self.blood_leak_time_since_last_zero_ms = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8]))[0] self.blood_leak_in_range_drift_status = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9]))[0] self.blood_leak_upper_range_drift_status = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10]))[0] self.hd_blood_leak_status_timestamp = timestamp def cmd_blood_leak_detector_override(self, detected: int, reset=NO_RESET): """ Constructs and sends the blood leak detector state override command Constraints: Must be logged into DD. @param detected: unsigned int - detected (0=detected, 1=undetected) to override detector with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) det = integer_to_bytearray(detected) payload = rst + det message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_STATUS_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("Override blood leak detector state value") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_zero_request(self): """ Request blood leak zeroing Constraints: Must be logged into DD. @return: 1 if successful, zero otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_ZERO_REQUEST.value) self.logger.debug("Request blood leak zeroing") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_data_broadcast_interval_override(self, ms, reset=NO_RESET): """ Constructs and sends the blood leak data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_DATA_PUBLISH_INTERVAL_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("Override DD blood leak data broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Blood leak data broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_set_to_embedded_mode(self): """ Constructs and sends switching to embedded mode command Constraints: Must be logged into DD. @return: non-zero integer if successful, False otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_SET_TO_EMBEDDED_MODE_REQUEST.value) self.logger.debug("Setting the blood leak to embedded mode") received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_set_embedded_mode_command(self, command: int, msg_payload: int = None): """ Constructs and sends switching to embedded mode command Constraints: Must be logged into DD. It is recommended to have delays in between sending each command to make sure the firmware's queue is not overflown. The sensor must be in embedded mode. The firmware sets the sensor into the embedded mode by default. @return: non-zero integer if successful, False otherwise """ if command < len(EmbModeCommands): command_bytes = byte_to_bytearray(command) self.blood_leak_emb_mode_cmds[EmbModeCommands(command).name] = '' data = 0 if msg_payload is not None: data = msg_payload data = unsigned_short_to_bytearray(data) payload = command_bytes + data message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_SET_EMBEDDED_MODE_CMD_REQUEST.value, payload=payload) self.logger.debug("Sending " + str(EmbModeCommands(command).name) + " to the blood leak sensor") received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False else: self.logger.debug("Invalid command!") @publish(['hd_blood_leak_emb_mode_response_timestamp', 'blood_leak_emb_mode_cmd_response']) def _handler_blood_leak_emb_mode_cmd_resp(self, message, timestamp=0.0): """ Handles published blood leak status messages. Blood leak status is captured for reference. @param message: published blood leak status message @return: None """ # Clear the variable for the next read blood_leak_emb_mode_cmd_response = '' payload = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 cmd, index = bytearray_to_byte(payload, index, False) length, index = bytearray_to_integer(payload, index, False) for i in range(0, length): # Loop through the length and get the char, char_index = bytearray_to_byte(payload, index + i, False) blood_leak_emb_mode_cmd_response += chr(char) self.blood_leak_emb_mode_cmds[EmbModeCommands(cmd).name] = blood_leak_emb_mode_cmd_response self.hd_blood_leak_emb_mode_response_timestamp = timestamp def cmd_blood_leak_emb_mode_info_cmds_override(self, cmd: int, value_to_override: int, reset=NO_RESET): """ Constructs and sends the blood leak intensity override command Constraints: Must be logged into DD. Can only override Intensity (I) and Blood Detect Level (V) @param cmd: unsigned int - the command that its info value is overridden (only I and V) @param value_to_override: unsigned int - value to override the intensity to @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) value = integer_to_bytearray(value_to_override) index = integer_to_bytearray(EmbModeCommands(cmd).value) payload = rst + value + index message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_EMBEDDED_MODE_INFO_OVERRIDE_REQUEST.value, payload=payload) if reset == NO_RESET: self.logger.debug("Override blood leak {} to {}".format(EmbModeCommands(cmd).name, value_to_override)) else: self.logger.debug("Reset override blood leak {} ".format(EmbModeCommands(cmd).name)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_intensity_moving_average_override(self, value: float, reset=NO_RESET): """ Constructs and sends the blood leak intensity moving average Constraints: Must be logged into DD. @param value: float - blood leak intensity moving average @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) avg = float_to_bytearray(value) payload = rst + avg message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_INTENSITY_MOVING_AVERAGE_OVERRIDE_REQUEST.value, payload=payload) self.logger.debug("Override blood leak intensity moving average to {:5.3f}".format(value)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_zeroing_interval_in_minutes(self, upper_interval: int, value_mins: int, reset=NO_RESET): """ Constructs and sends the blood leak zeroing interval in minutes Constraints: Must be logged into DD. @param upper_interval: unsigned int - to override upper range or drift intervals (0 = drift interval, 1 = upper interval) @param value_mins: unsigned int - blood leak zeroing interval in minutes @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) mins = integer_to_bytearray(value_mins) upper = integer_to_bytearray(upper_interval) payload = rst + mins + upper message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_ZEROING_INTERVAL_IN_MS_OVERRIDE_REQUEST.value, payload=payload) text = 'intensity drift' if upper_interval == 1: text = 'upper intensity' self.logger.debug("Override blood leak zeroing {} interval to {} minute(s)".format(text,mins)) # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False