Index: leahi_dialin/dd/modules/blood_leak.py =================================================================== diff -u -r776be9fb446685fb492a2b7bd6faf4eac7dfe714 -r18c90a1b2b6c7339bdd192a2d2fac32f2b8a35df --- leahi_dialin/dd/modules/blood_leak.py (.../blood_leak.py) (revision 776be9fb446685fb492a2b7bd6faf4eac7dfe714) +++ leahi_dialin/dd/modules/blood_leak.py (.../blood_leak.py) (revision 18c90a1b2b6c7339bdd192a2d2fac32f2b8a35df) @@ -1,12 +1,28 @@ +########################################################################### +# +# Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file blood_leak.py +# +# @author (last) Zoltan Miskolci +# @date (last) 07-Jan-2026 +# @author (original) Dara Navaei +# @date (original) 21-Aug-2025 +# +############################################################################ import struct from logging import Logger from enum import unique -from .constants import RESET, NO_RESET -from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions +from leahi_dialin.common.constants import NO_RESET from leahi_dialin.common.dd_defs import DDBloodLeakStates -from leahi_dialin.protocols.CAN import DenaliMessage, DenaliChannels +from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions +from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override +from leahi_dialin.protocols.CAN import DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish, DialinEnum from leahi_dialin.utils.conversions import integer_to_bytearray, bytearray_to_byte, bytearray_to_integer, \ unsigned_short_to_bytearray, byte_to_bytearray, float_to_bytearray @@ -39,7 +55,6 @@ def __init__(self, can_interface, logger: Logger): """ - @param can_interface: Leahi Can Messenger object """ super().__init__() @@ -75,6 +90,7 @@ # Initialize all the embedded mode commands self.blood_leak_emb_mode_cmds[cmd] = '' + def get_blood_leak_status(self): """ Gets the current blood leak status @@ -83,6 +99,7 @@ """ return self.blood_leak_status + def get_blood_leak_state(self): """ Gets the current blood leak state @@ -93,6 +110,7 @@ """ return self.blood_leak_state + def get_blood_leak_emb_mode_command_response(self, emb_mod_cmd: int) -> str: """ Gets the most recent embedded mode command response for a given command @@ -106,6 +124,7 @@ else: self.logger.debug("Invalid command!") + @publish(['msg_id_dd_blood_leak_data', 'blood_leak_status', 'blood_leak_state', 'blood_leak_error_persistent_ctr', 'blood_leak_serial_comm_state', 'blood_leak_intensity', 'blood_leak_blood_detect', 'blood_leak_intensity_moving_average', @@ -114,7 +133,7 @@ 'dd_blood_leak_status_timestamp',]) def _handler_blood_leak_sync(self, message, timestamp=0.0): """ - Handles published blood leak status messages. Blood leak status is captured + Handles published blood leak status messages. Blood leak status is captured for reference. @param message: published blood leak status message @@ -142,99 +161,98 @@ message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10]))[0] self.dd_blood_leak_status_timestamp = timestamp - def cmd_blood_leak_detector_override(self, detected: int, reset=NO_RESET): + + @publish(['msg_id_dd_send_blood_leak_emb_mode_response', 'blood_leak_emb_mode_cmd_response', + 'dd_blood_leak_emb_mode_response_timestamp']) + def _handler_blood_leak_emb_mode_cmd_resp(self, message, timestamp=0.0): """ - Constructs and sends the blood leak detector state override command - Constraints: - Must be logged into DD. + Handles published blood leak status messages. Blood leak status is captured + for reference. - @param detected: unsigned int - detected (0=detected, 1=undetected) to override detector with - @param reset: integer - 1 to reset a previous override, 0 to override - @return: 1 if successful, zero otherwise + @param message: published blood leak status message + @return: None """ - rst = integer_to_bytearray(reset) - det = integer_to_bytearray(detected) - payload = rst + det + # Clear the variable for the next read + blood_leak_emb_mode_cmd_response = '' + payload = message['message'] + index = MsgFieldPositions.START_POS_FIELD_1 + cmd, index = bytearray_to_byte(payload, index, False) + length, index = bytearray_to_integer(payload, index, False) - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, - message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_STATUS_OVERRIDE_REQUEST.value, - payload=payload) + for i in range(0, length): + # Loop through the length and get the + char, char_index = bytearray_to_byte(payload, index + i, False) + blood_leak_emb_mode_cmd_response += chr(char) - self.logger.debug("Override blood leak detector state value") + self.blood_leak_emb_mode_cmds[EmbModeCommands(cmd).name] = blood_leak_emb_mode_cmd_response + self.dd_blood_leak_emb_mode_response_timestamp = timestamp - # Send message - received_message = self.can_interface.send(message) - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False - - def cmd_blood_leak_zero_request(self): + def cmd_blood_leak_data_broadcast_interval_override(self, ms, reset=NO_RESET): """ - Request blood leak zeroing + Constructs and sends the blood leak data broadcast interval override command Constraints: Must be logged into DD. + Given interval must be non-zero and a multiple of the DD general task interval (50 ms). + @param ms: integer - interval (in ms) to override with + @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, - message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_ZERO_REQUEST.value) + return cmd_generic_broadcast_interval_override( + ms = ms, + reset = reset, + channel_id = DenaliChannels.dialin_to_dd_ch_id, + msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_DATA_PUBLISH_INTERVAL_OVERRIDE_REQUEST, + module_name = 'DD Blood Leak', + logger = self.logger, + can_interface = self.can_interface) - self.logger.debug("Request blood leak zeroing") - # Send message - received_message = self.can_interface.send(message) - - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False - - def cmd_blood_leak_data_broadcast_interval_override(self, ms, reset=NO_RESET): + def cmd_blood_leak_detector_override(self, detected: int, reset=NO_RESET): """ - Constructs and sends the blood leak data broadcast interval override command + Constructs and sends the blood leak detector state override command Constraints: Must be logged into DD. - Given interval must be non-zero and a multiple of the DD general task interval (50 ms). - @param ms: integer - interval (in ms) to override with + @param detected: unsigned int - detected (0=detected, 1=undetected) to override detector with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ - rst = integer_to_bytearray(reset) - mis = integer_to_bytearray(ms) - payload = rst + mis - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, - message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_DATA_PUBLISH_INTERVAL_OVERRIDE_REQUEST.value, - payload=payload) + det = integer_to_bytearray(detected) + payload = rst + det - self.logger.debug("Override DD blood leak data broadcast interval") + return cmd_generic_override( + payload = payload, + reset = reset, + channel_id = DenaliChannels.dialin_to_dd_ch_id, + msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_STATUS_OVERRIDE_REQUEST, + entity_name = 'DD Blood Leak Detector state', + override_text = str(detected), + logger = self.logger, + can_interface = self.can_interface) - # Send message - received_message = self.can_interface.send(message) - # If there is content... - if received_message is not None: - if reset == RESET: - str_res = "reset back to normal: " - else: - str_res = str(ms) + " ms: " - self.logger.debug("Blood leak data broadcast interval overridden to " + str_res + - str(received_message['message'][DenaliMessage.PAYLOAD_START_INDEX])) - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False + def cmd_blood_leak_zero_request(self): + """ + Request blood leak zeroing + Constraints: + Must be logged into DD. + @return: 1 if successful, zero otherwise + """ + return cmd_generic_override( + payload = None, + reset = NO_RESET, + channel_id = DenaliChannels.dialin_to_dd_ch_id, + msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_ZERO_REQUEST, + entity_name = 'DD Blood Leak Zeroing', + override_text = 'Active', + logger = self.logger, + can_interface = self.can_interface) + + def cmd_blood_leak_set_to_embedded_mode(self): """ Constructs and sends switching to embedded mode command @@ -243,21 +261,17 @@ @return: non-zero integer if successful, False otherwise """ - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, - message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_SET_TO_EMBEDDED_MODE_REQUEST.value) + return cmd_generic_override( + payload = None, + reset = NO_RESET, + channel_id = DenaliChannels.dialin_to_dd_ch_id, + msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_SET_TO_EMBEDDED_MODE_REQUEST, + entity_name = 'DD Blood Leak Embedded mode', + override_text = 'Active', + logger = self.logger, + can_interface = self.can_interface) - self.logger.debug("Setting the blood leak to embedded mode") - received_message = self.can_interface.send(message) - - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False - def cmd_blood_leak_set_embedded_mode_command(self, command: int, msg_payload: int = None): """ Constructs and sends switching to embedded mode command @@ -279,49 +293,18 @@ data = unsigned_short_to_bytearray(data) payload = command_bytes + data - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, - message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_SET_EMBEDDED_MODE_CMD_REQUEST.value, - payload=payload) - self.logger.debug("Sending " + str(EmbModeCommands(command).name) + " to the blood leak sensor") + return cmd_generic_override( + payload = payload, + reset = NO_RESET, + channel_id = DenaliChannels.dialin_to_dd_ch_id, + msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_SET_EMBEDDED_MODE_CMD_REQUEST, + entity_name = f'DD {str(EmbModeCommands(command).name)} to the blood leak sensor', + override_text = 'Active', + logger = self.logger, + can_interface = self.can_interface) - received_message = self.can_interface.send(message) - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False - else: - self.logger.debug("Invalid command!") - - @publish(['msg_id_dd_send_blood_leak_emb_mode_response', 'blood_leak_emb_mode_cmd_response', - 'dd_blood_leak_emb_mode_response_timestamp']) - def _handler_blood_leak_emb_mode_cmd_resp(self, message, timestamp=0.0): - """ - Handles published blood leak status messages. Blood leak status is captured - for reference. - - @param message: published blood leak status message - @return: None - """ - # Clear the variable for the next read - blood_leak_emb_mode_cmd_response = '' - payload = message['message'] - index = MsgFieldPositions.START_POS_FIELD_1 - cmd, index = bytearray_to_byte(payload, index, False) - length, index = bytearray_to_integer(payload, index, False) - - for i in range(0, length): - # Loop through the length and get the - char, char_index = bytearray_to_byte(payload, index + i, False) - blood_leak_emb_mode_cmd_response += chr(char) - - self.blood_leak_emb_mode_cmds[EmbModeCommands(cmd).name] = blood_leak_emb_mode_cmd_response - self.dd_blood_leak_emb_mode_response_timestamp = timestamp - def cmd_blood_leak_emb_mode_info_cmds_override(self, cmd: int, value_to_override: int, reset=NO_RESET): """ Constructs and sends the blood leak intensity override command @@ -338,26 +321,18 @@ value = integer_to_bytearray(value_to_override) index = integer_to_bytearray(EmbModeCommands(cmd).value) payload = rst + value + index - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, - message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_EMBEDDED_MODE_INFO_OVERRIDE_REQUEST.value, - payload=payload) - if reset == NO_RESET: - self.logger.debug("Override blood leak {} to {}".format(EmbModeCommands(cmd).name, value_to_override)) - else: - self.logger.debug("Reset override blood leak {} ".format(EmbModeCommands(cmd).name)) + return cmd_generic_override( + payload = payload, + reset = reset, + channel_id = DenaliChannels.dialin_to_dd_ch_id, + msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_EMBEDDED_MODE_INFO_OVERRIDE_REQUEST, + entity_name = f'DD Blood Leak {EmbModeCommands(cmd).name}', + override_text = str(value_to_override), + logger = self.logger, + can_interface = self.can_interface) - # Send message - received_message = self.can_interface.send(message) - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False - def cmd_blood_leak_intensity_moving_average_override(self, value: float, reset=NO_RESET): """ Constructs and sends the blood leak intensity moving average @@ -371,23 +346,18 @@ rst = integer_to_bytearray(reset) avg = float_to_bytearray(value) payload = rst + avg - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, - message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_INTENSITY_MOVING_AVERAGE_OVERRIDE_REQUEST.value, - payload=payload) - self.logger.debug("Override blood leak intensity moving average to {:5.3f}".format(value)) + return cmd_generic_override( + payload = payload, + reset = reset, + channel_id = DenaliChannels.dialin_to_dd_ch_id, + msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_INTENSITY_MOVING_AVERAGE_OVERRIDE_REQUEST, + entity_name = 'DD Blood Leak Intensity Moving Average', + override_text = str(value), + logger = self.logger, + can_interface = self.can_interface) - # Send message - received_message = self.can_interface.send(message) - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False - def cmd_blood_leak_zeroing_interval_in_minutes(self, upper_interval: int, value_mins: int, reset=NO_RESET): """ Constructs and sends the blood leak zeroing interval in minutes @@ -404,22 +374,13 @@ upper = integer_to_bytearray(upper_interval) payload = rst + mins + upper - message = DenaliMessage.build_message(channel_id=DenaliChannels.dialin_to_dd_ch_id, - message_id=MsgIds.MSG_ID_DD_BLOOD_LEAK_ZEROING_INTERVAL_IN_MS_OVERRIDE_REQUEST.value, - payload=payload) - text = 'intensity drift' - if upper_interval == 1: - text = 'upper intensity' - - self.logger.debug("Override blood leak zeroing {} interval to {} minute(s)".format(text,mins)) - - # Send message - received_message = self.can_interface.send(message) - - # If there is content... - if received_message is not None: - # response payload is OK or not OK - return received_message['message'][DenaliMessage.PAYLOAD_START_INDEX] - else: - self.logger.debug("Timeout!!!!") - return False + text = 'upper intensity' if upper_interval == 1 else 'intensity drift' + return cmd_generic_override( + payload = payload, + reset = reset, + channel_id = DenaliChannels.dialin_to_dd_ch_id, + msg_id = MsgIds.MSG_ID_DD_BLOOD_LEAK_INTENSITY_MOVING_AVERAGE_OVERRIDE_REQUEST, + entity_name = f'DD Blood Leak Zeroing {text} interval', + override_text = f'{str(value_mins)} minutes', + logger = self.logger, + can_interface = self.can_interface)