########################################################################### # # Copyright (c) 2021-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file blood_leak.py # # @author (last) Micahel Garthwaite # @date (last) 07-Mar-2023 # @author (original) Peman Montazemi # @date (original) 15-Apr-2021 # ############################################################################ import struct from logging import Logger from enum import unique from .constants import RESET, NO_RESET from ..common.msg_defs import MsgIds, MsgFieldPositions from ..protocols.CAN import DenaliMessage, DenaliChannels from ..utils.base import AbstractSubSystem, publish, DialinEnum from ..utils.conversions import integer_to_bytearray, bytearray_to_byte, bytearray_to_integer, \ unsigned_short_to_bytearray, byte_to_bytearray @unique class EmbModeCommands(DialinEnum): NU = 0 # NULL command CS = 1 # Control S command (this is handled automatically by using the embedded mode command) SP = 2 # Set set point command T = 3 # Self test command G = 4 # Get self-test drive I = 5 # Display intensity V = 6 # Display blood detection level Z = 7 # Zero command Q = 8 # Zero confirm command D = 9 # Display set point C = 10 # Calibration command @unique class BloodLeakStates(DialinEnum): """ HD blood leak embedded mode state machine states """ BLOOD_LEAK_WAIT_FOR_POST_STATE = 0 BLOOD_LEAK_CHECK_SET_POINT_STATE = 1 BLOOD_LEAK_INIT_STATE = 2 BLOOD_LEAK_CHECK_ZERO_AND_SELF_TEST_STATE = 3 BLOOD_LEAK_NORMAL_STATE = 4 class HDBloodLeak(AbstractSubSystem): """ HDBloodLeak Hemodialysis Delivery (HD) Dialin API sub-class for blood leak related commands. """ # Blood leak detector status BLOOD_LEAK_DETECTED = 0 # Blood detected NO_BLOOD_LEAK_DETECTED = 1 # No blood detected def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger if self.can_interface is not None: channel_id = DenaliChannels.hd_sync_broadcast_ch_id msg_id = MsgIds.MSG_ID_HD_BLOOD_LEAK_DATA.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_blood_leak_sync) channel_id = DenaliChannels.hd_to_dialin_ch_id msg_id = MsgIds.MSG_ID_HD_SEND_BLOOD_LEAK_EMB_MODE_RESPONSE.value self.can_interface.register_receiving_publication_function(channel_id, msg_id, self._handler_blood_leak_emb_mode_cmd_resp) self.hd_blood_leak_status_timestamp = 0.0 self.hd_blood_leak_emb_mode_response_timestamp = 0.0 self.blood_leak_status = self.NO_BLOOD_LEAK_DETECTED self.blood_leak_state = BloodLeakStates.BLOOD_LEAK_INIT_STATE.value self.blood_leak_emb_mode_cmds = dict() for cmd in EmbModeCommands.__members__: # Initialize all the embedded mode commands self.blood_leak_emb_mode_cmds[cmd] = '' def get_blood_leak_status(self): """ Gets the current blood leak status @return: List containing blood leak status: [detected, undetected] """ return self.blood_leak_status def get_blood_leak_state(self): """ Gets the current blood leak state @return: integer - blood leak state (0: init, 1: zeroing, 2: self-test, 3: normal) """ return self.blood_leak_state def get_blood_leak_emb_mode_command_response(self, emb_mod_cmd: int) -> str: """ Gets the most recent embedded mode command response for a given command @param emb_mod_cmd the command to get its response @return: string - embedded mode command response """ if emb_mod_cmd < len(EmbModeCommands): return self.blood_leak_emb_mode_cmds[EmbModeCommands(emb_mod_cmd).name] else: self.logger.debug("Invalid command!") @publish(['hd_blood_leak_status_timestamp', 'blood_leak_status', 'blood_leak_state']) def _handler_blood_leak_sync(self, message, timestamp=0.0): """ Handles published blood leak status messages. Blood leak status is captured for reference. @param message: published blood leak status message @return: None """ self.blood_leak_status = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.blood_leak_state = struct.unpack('i', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.hd_blood_leak_status_timestamp = timestamp def cmd_blood_leak_detector_override(self, detected: int, reset=NO_RESET): """ Constructs and sends the blood leak detector state override command Constraints: Must be logged into HD. @param detected: unsigned int - detected (0=detected, 1=undetected) to override detector with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) det = integer_to_bytearray(detected) payload = rst + det message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_BLOOD_LEAK_STATUS_OVERRIDE.value, payload=payload) self.logger.debug("Override blood leak detector state value") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_zero_request(self): """ Request blood leak zeroing Constraints: Must be logged into HD. @return: 1 if successful, zero otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_BLOOD_LEAK_ZERO_REQUEST.value) self.logger.debug("Request blood leak zeroing") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_data_broadcast_interval_override(self, ms, reset=NO_RESET): """ Constructs and sends the blood leak data broadcast interval override command Constraints: Must be logged into HD. Given interval must be non-zero and a multiple of the HD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ rst = integer_to_bytearray(reset) mis = integer_to_bytearray(ms) payload = rst + mis message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_BLOOD_LEAK_DATA_SEND_INTERVAL_OVERRIDE.value, payload=payload) self.logger.debug("Override HD blood leak data broadcast interval") # Send message received_message = self.can_interface.send(message) # If there is content... if received_message is not None: if reset == RESET: str_res = "reset back to normal: " else: str_res = str(ms) + " ms: " self.logger.debug("Blood leak data broadcast interval overridden to " + str_res + str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_set_to_embedded_mode(self): """ Constructs and sends switching to embedded mode command Constraints: Must be logged into HD. @return: non-zero integer if successful, False otherwise """ message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SET_BLOOD_LEAK_2_EMB_MODE.value) self.logger.debug("Setting the blood leak to embedded mode") received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False def cmd_blood_leak_set_embedded_mode_command(self, command: int, msg_payload: int = None): """ Constructs and sends switching to embedded mode command Constraints: Must be logged into HD. It is recommended to have delays in between sending each command to make sure the firmware's queue is not overflown. The sensor must be in embedded mode. The firmware sets the sensor into the embedded mode by default. @return: non-zero integer if successful, False otherwise """ if command < len(EmbModeCommands): command_bytes = byte_to_bytearray(command) self.blood_leak_emb_mode_cmds[EmbModeCommands(command).name] = '' data = 0 if msg_payload is not None: data = msg_payload data = unsigned_short_to_bytearray(data) payload = command_bytes + data message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_hd_ch_id, message_id=MsgIds.MSG_ID_HD_SET_BLOOD_LEAK_EMB_MODE_COMMAND.value, payload=payload) self.logger.debug("Sending " + str(EmbModeCommands(command).name) + " to the blood leak sensor") received_message = self.can_interface.send(message) # If there is content... if received_message is not None: # response payload is OK or not OK return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] else: self.logger.debug("Timeout!!!!") return False else: self.logger.debug("Invalid command!") @publish(['hd_blood_leak_emb_mode_response_timestamp', 'blood_leak_emb_mode_cmd_response']) def _handler_blood_leak_emb_mode_cmd_resp(self, message, timestamp=0.0): """ Handles published blood leak status messages. Blood leak status is captured for reference. @param message: published blood leak status message @return: None """ # Clear the variable for the next read blood_leak_emb_mode_cmd_response = '' payload = message['message'] index = MsgFieldPositions.START_POS_FIELD_1 cmd, index = bytearray_to_byte(payload, index, False) length, index = bytearray_to_integer(payload, index, False) for i in range(0, length): # Loop through the length and get the char, char_index = bytearray_to_byte(payload, index + i, False) blood_leak_emb_mode_cmd_response += chr(char) self.blood_leak_emb_mode_cmds[EmbModeCommands(cmd).name] = blood_leak_emb_mode_cmd_response self.hd_blood_leak_emb_mode_response_timestamp = timestamp