########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file blood_leak.py # # @author (last) Zoltan Miskolci # @date (last) 07-Jan-2026 # @author (original) Dara Navaei # @date (original) 21-Aug-2025 # ############################################################################ import struct from logging import Logger from enum import unique from leahi_dialin.common.constants import NO_RESET from leahi_dialin.common.dd_defs import DDBloodLeakStates from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.protocols.CAN import DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish, DialinEnum from leahi_dialin.utils.conversions import integer_to_bytearray, bytearray_to_byte, bytearray_to_integer, \ unsigned_short_to_bytearray, byte_to_bytearray, float_to_bytearray @unique class EmbModeCommands(DialinEnum): NU = 0 # NULL command CS = 1 # Control S command (this is handled automatically by using the embedded mode command) SP = 2 # Set set point command T = 3 # Self test command G = 4 # Get self-test drive I = 5 # Display intensity V = 6 # Display blood detection level Z = 7 # Zero command Q = 8 # Zero confirm command D = 9 # Display set point C = 10 # Calibration command class DDBloodLeak(AbstractSubSystem): """ DDBloodLeak Dialysate Delivery (DD) Dialin API sub-class for blood leak related commands. """ # Blood leak detector status BLOOD_LEAK_DETECTED = 0 # Blood detected NO_BLOOD_LEAK_DETECTED = 1 # No blood detected def __init__(self, can_interface, logger: Logger): """ @param can_interface: Leahi Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.dd_sync_broadcast_ch_id self.msg_id_dd_blood_leak_data = MsgIds.MSG_ID_DD_BLOOD_LEAK_DATA.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_dd_blood_leak_data, self._handler_blood_leak_sync) channel_id = DenaliChannels.dd_to_dialin_ch_id self.msg_id_dd_send_blood_leak_emb_mode_response = MsgIds.MSG_ID_DD_SEND_BLOOD_LEAK_EMB_MODE_RESPONSE.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_dd_send_blood_leak_emb_mode_response, self._handler_blood_leak_emb_mode_cmd_resp) self.dd_blood_leak_status_timestamp = 0.0 self.dd_blood_leak_emb_mode_response_timestamp = 0.0 self.blood_leak_status = self.NO_BLOOD_LEAK_DETECTED self.blood_leak_state = DDBloodLeakStates.BLOOD_LEAK_INIT_STATE.value self.blood_leak_emb_mode_cmds = dict() self.blood_leak_error_persistent_ctr = 0 self.blood_leak_serial_comm_state = 0 self.blood_leak_intensity = 0 self.blood_leak_blood_detect = 0 self.blood_leak_intensity_moving_average = 0.0 self.blood_leak_in_range_drift_status = 0 self.blood_leak_upper_range_drift_status = 0 self.blood_leak_time_elapsed_since_last_drift_zero_ms = 0 for cmd in EmbModeCommands.__members__: # Initialize all the embedded mode commands self.blood_leak_emb_mode_cmds[cmd] = '' def get_blood_leak_status(self): """ Gets the current blood leak status @return: List containing blood leak status: [detected, undetected] """ return self.blood_leak_status def get_blood_leak_state(self): """ Gets the current blood leak state (0: wait for POST, 1: check set point, 2: init, 3: zero and self test, 4: verify zero, 5: normal, 6: recover blood detect state ) @return: integer - blood leak state """ return self.blood_leak_state def get_blood_leak_emb_mode_command_response(self, emb_mod_cmd: int) -> str: """ Gets the most recent embedded mode command response for a given command @param emb_mod_cmd the command to get its response @return: string - embedded mode command response """ if emb_mod_cmd < len(EmbModeCommands): return self.blood_leak_emb_mode_cmds[EmbModeCommands(emb_mod_cmd).name] else: self.logger.debug("Invalid command!") @publish(['msg_id_dd_blood_leak_data', 'blood_leak_status', 'blood_leak_state', 'blood_leak_error_persistent_ctr', 'blood_leak_serial_comm_state', 'blood_leak_intensity', 'blood_leak_blood_detect', 'blood_leak_intensity_moving_average', 'blood_leak_time_elapsed_since_last_drift_zero_ms', 'blood_leak_in_range_drift_status', 'blood_leak_upper_range_drift_status', 'dd_blood_leak_status_timestamp',]) def _handler_blood_leak_sync(self, message, timestamp=0.0): """ Handles published blood leak status messages. Blood leak status is captured for reference. @param message: published blood leak status message @return: None """ self.blood_leak_status = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.blood_leak_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.blood_leak_error_persistent_ctr = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.blood_leak_serial_comm_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.blood_leak_intensity = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.blood_leak_blood_detect = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6]))[0] self.blood_leak_intensity_moving_average = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7]))[0] self.blood_leak_time_elapsed_since_last_drift_zero_ms = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8]))[0] self.blood_leak_in_range_drift_status = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9]))[0] self.blood_leak_upper_range_drift_status = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10]))[0] self.dd_blood_leak_status_timestamp = timestamp @publish(['msg_id_dd_send_blood_leak_emb_mode_response', 'blood_leak_emb_mode_cmd_response', 'dd_blood_leak_emb_mode_response_timestamp']) def _handler_blood_leak_emb_mode_cmd_resp(self, message, timestamp=0.0): """ Handles published blood leak status messages. Blood leak status is captured for reference. @param message: published blood leak status message @return: None """ # Clear the variable for the next read blood_leak_emb_mode_cmd_response = '' payload = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 cmd, index = bytearray_to_byte(payload, index, False) length, index = bytearray_to_integer(payload, index, False) for i in range(0, length): # Loop through the length and get the char, char_index = bytearray_to_byte(payload, index + i, False) blood_leak_emb_mode_cmd_response += chr(char) self.blood_leak_emb_mode_cmds[EmbModeCommands(cmd).name] = blood_leak_emb_mode_cmd_response self.dd_blood_leak_emb_mode_response_timestamp = timestamp def cmd_blood_leak_data_broadcast_interval_override(self, ms, reset=NO_RESET): """ Constructs and sends the blood leak data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ return cmd_generic_broadcast_interval_override( ms = ms, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_DATA_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name = 'DD Blood Leak', logger = self.logger, can_interface = self.can_interface) def cmd_blood_leak_detector_override(self, detected: int, reset=NO_RESET): """ Constructs and sends the blood leak detector state override command Constraints: Must be logged into DD. @param detected: unsigned int - detected (0=detected, 1=undetected) to override detector with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) det = integer_to_bytearray(detected) payload = rst + det return cmd_generic_override( payload = payload, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_STATUS_OVERRIDE_REQUEST, entity_name = 'DD Blood Leak Detector state', override_text = str(detected), logger = self.logger, can_interface = self.can_interface) def cmd_blood_leak_zero_request(self): """ Request blood leak zeroing Constraints: Must be logged into DD. @return: 1 if successful, zero otherwise """ return cmd_generic_override( payload = None, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_ZERO_REQUEST, entity_name = 'DD Blood Leak Zeroing', override_text = 'Active', logger = self.logger, can_interface = self.can_interface) def cmd_blood_leak_set_to_embedded_mode(self): """ Constructs and sends switching to embedded mode command Constraints: Must be logged into DD. @return: non-zero integer if successful, False otherwise """ return cmd_generic_override( payload = None, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_SET_TO_EMBEDDED_MODE_REQUEST, entity_name = 'DD Blood Leak Embedded mode', override_text = 'Active', logger = self.logger, can_interface = self.can_interface) def cmd_blood_leak_set_embedded_mode_command(self, command: int, msg_payload: int = None): """ Constructs and sends switching to embedded mode command Constraints: Must be logged into DD. It is recommended to have delays in between sending each command to make sure the firmware's queue is not overflown. The sensor must be in embedded mode. The firmware sets the sensor into the embedded mode by default. @return: non-zero integer if successful, False otherwise """ if command < len(EmbModeCommands): command_bytes = byte_to_bytearray(command) self.blood_leak_emb_mode_cmds[EmbModeCommands(command).name] = '' data = 0 if msg_payload is not None: data = msg_payload data = unsigned_short_to_bytearray(data) payload = command_bytes + data return cmd_generic_override( payload = payload, reset = NO_RESET, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_SET_EMBEDDED_MODE_CMD_REQUEST, entity_name = f'DD {str(EmbModeCommands(command).name)} to the blood leak sensor', override_text = 'Active', logger = self.logger, can_interface = self.can_interface) def cmd_blood_leak_emb_mode_info_cmds_override(self, cmd: int, value_to_override: int, reset=NO_RESET): """ Constructs and sends the blood leak intensity override command Constraints: Must be logged into DD. Can only override Intensity (I) and Blood Detect Level (V) @param cmd: unsigned int - the command that its info value is overridden (only I and V) @param value_to_override: unsigned int - value to override the intensity to @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) value = integer_to_bytearray(value_to_override) index = integer_to_bytearray(EmbModeCommands(cmd).value) payload = rst + value + index return cmd_generic_override( payload = payload, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_EMBEDDED_MODE_INFO_OVERRIDE_REQUEST, entity_name = f'DD Blood Leak {EmbModeCommands(cmd).name}', override_text = str(value_to_override), logger = self.logger, can_interface = self.can_interface) def cmd_blood_leak_intensity_moving_average_override(self, value: float, reset=NO_RESET): """ Constructs and sends the blood leak intensity moving average Constraints: Must be logged into DD. @param value: float - blood leak intensity moving average @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) avg = float_to_bytearray(value) payload = rst + avg return cmd_generic_override( payload = payload, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_INTENSITY_MOVING_AVERAGE_OVERRIDE_REQUEST, entity_name = 'DD Blood Leak Intensity Moving Average', override_text = str(value), logger = self.logger, can_interface = self.can_interface) def cmd_blood_leak_zeroing_interval_in_minutes(self, upper_interval: int, value_mins: int, reset=NO_RESET): """ Constructs and sends the blood leak zeroing interval in minutes Constraints: Must be logged into DD. @param upper_interval: unsigned int - to override upper range or drift intervals (0 = drift interval, 1 = upper interval) @param value_mins: unsigned int - blood leak zeroing interval in minutes @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) mins = integer_to_bytearray(value_mins) upper = integer_to_bytearray(upper_interval) payload = rst + mins + upper text = 'upper intensity' if upper_interval == 1 else 'intensity drift' return cmd_generic_override( payload = payload, reset = reset, channel_id = DenaliChannels.dialin_to_dd_ch_id, msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_INTENSITY_MOVING_AVERAGE_OVERRIDE_REQUEST, entity_name = f'DD Blood Leak Zeroing {text} interval', override_text = f'{str(value_mins)} minutes', logger = self.logger, can_interface = self.can_interface)