########################################################################### # # Copyright (c) 2025-2026 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file drybicart.py # # @author (last) Sameer Poyil # @date (last) 19-Nov-2025 # @author (original) Sameer Poyil # @date (original) 19-Nov-2025 # ############################################################################ import struct from logging import Logger from leahi_dialin.common.constants import NO_RESET , RESET from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.protocols.CAN import DenaliChannels from leahi_dialin.utils.base import AbstractSubSystem, publish from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray from dataclasses import dataclass @dataclass class ControllerSignals: """Equivalent to PI_CONTROLLER_SIGNALS_DATA""" control_signal_reference: float = 0.0 # ///< Reference value control_signal_measured: float = 0.0 # ///< Measured value control_signal_error: float = 0.0 # ///< Error value control_signal_error_sum: float = 0.0 # ///< Error sum before anti-windup control_signal_error_sum_after_windup: float = 0.0 # ///< Error sum after anti-windup control_signal_proportional_output: float = 0.0 # ///< P portion control_signal_integral_output: float = 0.0 # ///< I portion control_signal_feed_forward_output: float = 0.0 # ///< Feed forward portion control_singal_control: float = 0.0 # ///< Controller output signal class DDDryBicart(AbstractSubSystem): """ DryBicart Dialysate Delivery (DD) Dialin API sub-class for Dry Bicart related commands. """ def __init__(self, can_interface, logger: Logger): """ @param can_interface: Denali Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self.signal = ControllerSignals() #: PI bicarb control signals self.acid_pi_control_signal = ControllerSignals() #: PI acid control signals if self.can_interface is not None: channel_id = DenaliChannels.dd_sync_broadcast_ch_id self.msg_id_dd_dry_bicart_data = MsgIds.MSG_ID_DD_DRY_BICART_DATA.value self.can_interface.register_receiving_publication_function(channel_id, self.msg_id_dd_dry_bicart_data, self._handler_dry_bicart_sync) self.dd_dry_bicart_timestamp = 0 #: The timestamp of the last message self.dd_dry_bicart_fill_execution_state = 0 #: The Dry Bicart fill execution state self.dd_bicarb_chamber_fill_execution_state = 0 #: The Bicarb chamber fill execution state self.dd_dry_bicart_drain_execution_state = 0 #: The Dry Bicart drain execution state self.dd_dry_bicart_fill_cycle_counter = 0 #: The Dry Bicart fill cycle counter self.dd_dry_bicart_max_fill_cycle_count = 0 #: The Dry Bicart max fill cycle state self.dd_dry_bicart_fill_request = 0 #: Is fill requested for Dry Bicart or not self.dd_bicarb_chamber_fill_request = 0 #: Is fill requested for Bicarb Chamber or not self.dd_dry_bicart_drain_request = 0 #: Is drain requested for Dry Bicart or not self.dd_dry_bicart_last_fill_time = 0 #: The Dry Bicart last fill time self.dd_dry_bicart_current_fill_time = 0 #: The Dry Bicart current fill time self.dd_dryBiCartType = 0 #: The Dry Bicart concentrate option index self.dd_dryBiCartDrainTimePeriod = 0 #: The Dry Bicart drain time period in sec self.signal.control_signal_reference = 0.0 self.signal.control_signal_measured = 0.0 self.signal.control_signal_error = 0.0 self.signal.control_signal_error_sum = 0.0 self.signal.control_signal_error_sum_after_windup = 0.0 self.signal.control_signal_proportional_output = 0.0 self.signal.control_signal_integral_output = 0.0 self.signal.control_signal_feed_forward_output = 0.0 self.signal.control_singal_control = 0.0 self.dd_dry_bicart_acid_dose_vol = 0.0 self.dd_dry_bicart_bicarb_dose_vol = 0.0 self.dd_dry_bicarb_kp_gain = 0.0 self.dd_dry_bicarb_ki_gain = 0.0 @publish(["msg_id_dd_dry_bicart_data", "dd_dry_bicart_fill_execution_state", "dd_bicarb_chamber_fill_execution_state", "dd_dry_bicart_drain_execution_state", "dd_dry_bicart_fill_cycle_counter", "dd_dry_bicart_max_fill_cycle_count", "dd_dry_bicart_fill_request", "dry_bicarb_chamber_fill_request", "dd_dry_bicart_drain_request", "dd_dry_bicart_last_fill_time", "dd_dry_bicart_current_fill_time", "dd_dryBiCartType", "dd_dryBiCartDrainTimePeriod", "bicarb_reference", "bicarb_measured", "bicarb_error ", "bicarb_error_sum", "bicarb_error_sum_after_windup", "bicarb_proportional_output", "bicarb_integral_output", "bicarb_feed_forward_output", "bicarb_control", "acid_dose_vol", "bicarb_dose_vol", "bicarb_kp_gain", "bicarb_ki_gain", "dd_dry_bicart_timestamp"]) def _handler_dry_bicart_sync(self, message, timestamp=0.0): """ Handles published dry bicart data messages. @param message: published dry bicart da`ta message @return: None """ self.dd_dry_bicart_fill_execution_state = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_1:MsgFieldPositions.END_POS_FIELD_1]))[0] self.dd_bicarb_chamber_fill_execution_state = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_2:MsgFieldPositions.END_POS_FIELD_2]))[0] self.dd_dry_bicart_drain_execution_state = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_3:MsgFieldPositions.END_POS_FIELD_3]))[0] self.dd_dry_bicart_fill_cycle_counter = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_4:MsgFieldPositions.END_POS_FIELD_4]))[0] self.dd_dry_bicart_max_fill_cycle_count = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_5:MsgFieldPositions.END_POS_FIELD_5]))[0] self.ddd_dry_bicart_fill_request = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_6:MsgFieldPositions.END_POS_FIELD_6]))[0] self.dd_bicarb_chamber_fill_request = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_7:MsgFieldPositions.END_POS_FIELD_7]))[0] self.ddd_dry_bicart_drain_request = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_8:MsgFieldPositions.END_POS_FIELD_8]))[0] self.dd_dry_bicart_last_fill_time = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_9:MsgFieldPositions.END_POS_FIELD_9]))[0] self.dd_dry_bicart_current_fill_time = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_10:MsgFieldPositions.END_POS_FIELD_10]))[0] self.dd_dryBiCartType = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_11:MsgFieldPositions.END_POS_FIELD_11]))[0] self.dd_dryBiCartDrainTimePeriod = struct.unpack('I', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_12:MsgFieldPositions.END_POS_FIELD_12]))[0] self.signal.control_signal_reference = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_13:MsgFieldPositions.END_POS_FIELD_13]))[0] self.signal.control_signal_measured = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_14:MsgFieldPositions.END_POS_FIELD_14]))[0] self.signal.control_signal_error = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_15:MsgFieldPositions.END_POS_FIELD_15]))[0] self.signal.control_signal_error_sum = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_16:MsgFieldPositions.END_POS_FIELD_16]))[0] self.signal.control_signal_error_sum_after_windup =struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_17:MsgFieldPositions.END_POS_FIELD_17]))[0] self.signal.control_signal_proportional_output = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_18:MsgFieldPositions.END_POS_FIELD_18]))[0] self.signal.control_signal_integral_output = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_19:MsgFieldPositions.END_POS_FIELD_19]))[0] self.signal.control_signal_feed_dorward_output = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_20:MsgFieldPositions.END_POS_FIELD_20]))[0] self.signal.control_singal_control =struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_21:MsgFieldPositions.END_POS_FIELD_21]))[0] self.dd_dry_bicart_acid_dose_vol = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_22:MsgFieldPositions.END_POS_FIELD_22]))[0] self.dd_dry_bicart_bicarb_dose_vol = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_23:MsgFieldPositions.END_POS_FIELD_23]))[0] self.dd_dry_bicarb_kp_gain = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_24:MsgFieldPositions.END_POS_FIELD_24]))[0] self.dd_dry_bicarb_ki_gain = struct.unpack('f', bytearray( message['message'][MsgFieldPositions.START_POS_FIELD_25:MsgFieldPositions.END_POS_FIELD_25]))[0] self.dd_dry_bicart_timestamp = timestamp def cmd_dry_bicart_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ return cmd_generic_broadcast_interval_override( ms=ms, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_DRY_BICART_DATA_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name='DD Dry Bicart', logger=self.logger, can_interface=self.can_interface) def cmd_bicart_max_fill_cycle_count_override(self, count: int, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart max cycle count override command Constraints: Must be logged into DD. @param count: int - count value to override max fill cycle count @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ cycle_count = integer_to_bytearray(count) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + cycle_count return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_DRY_BICART_FILL_CYCLE_MAX_OVERRIDE_REQUEST, entity_name='max fill cycle count', override_text=str(count), logger=self.logger, can_interface=self.can_interface) def cmd_dry_bicart_fill_request_override(self, start_stop: int, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart fill request override command Constraints: Must be logged into DD. @param start_stop: int - value to start or stop dry bicart fill ( start = 1, stop = 0 @return: 1 if successful, zero otherwise """ request = integer_to_bytearray(start_stop) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + request return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_DRY_BICART_FILL_REQUEST_OVERRIDE_REQUEST, entity_name='Bicart Fill Request', override_text=str(start_stop), logger=self.logger, can_interface=self.can_interface) def cmd_dry_bicart_bicarb_chamber_fill_request_override(self, start_stop: int, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart bicarbonate chamber F fill request override command Constraints: Must be logged into DD. @param start_stop: int - value to start or stop dry bicart fill ( start = 1, stop = 0 @return: 1 if successful, zero otherwise """ request = integer_to_bytearray(start_stop) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + request return cmd_generic_override( payload=payload, reset=NO_RESET, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_CHAMBER_FILL_REQUEST_OVERRIDE_REQUEST, entity_name='Bicarb Fill Request', override_text=str(start_stop), logger=self.logger, can_interface=self.can_interface) def cmd_dry_bicart_drain_request_override(self, start_stop: int, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart drain request override command Constraints: Must be logged into DD. @param start_stop: int - value to start or stop dry bicart drain ( start = 1, stop = 0 @return: 1 if successful, zero otherwise """ request = integer_to_bytearray(start_stop) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + request return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICART_DRAIN_REQUEST_OVERRIDE_REQUEST, entity_name='Bicart Drain Request', override_text=str(start_stop), logger=self.logger, can_interface=self.can_interface) def cmd_dry_bicart_cartridge_size_override(self, small_large: int, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart cartridge size override command Constraints: Must be logged into DD. @param small_large: int - value for small or large dry bicart ( small = 0, large = 1 @return: 1 if successful, zero otherwise """ cartridge_size = integer_to_bytearray(small_large) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + cartridge_size return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICART_CARTRIDGE_SELECT_OVERRIDE_REQUEST, entity_name='Bicart Cartridge Size', override_text=str(small_large), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_bicarb_dose_vol_control_override(self, bicarb_dose_vol: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart bicarb dose vol override command Constraints: Must be logged into DD. @param bicarb_dose_vol: float - bicarb_dose_vol value to override bicarb dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ bicarb_dose = float_to_bytearray(bicarb_dose_vol) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + bicarb_dose return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_OVERRIDE_REQUEST, entity_name='bicarb dose vol', override_text=str(bicarb_dose_vol), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_acid_dose_vol_control_override(self, acid_dose_vol: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart acid dose vol override command Constraints: Must be logged into DD. @param acid_dose_vol: float - acid_dose_vol value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ acid_dose = float_to_bytearray(acid_dose_vol) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + acid_dose return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_OVERRIDE_REQUEST, entity_name='acid dose vol', override_text=str(acid_dose_vol), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_bicarb_dose_vol_control_kp_gain_coeff_override(self, bicarb_kp_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart bicarb kp coeff override command Constraints: Must be logged into DD. @param bicarb_kp_coeff: float - bicarb_kp_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ kp_gain_coeff = float_to_bytearray(bicarb_kp_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + kp_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_KP_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='bicarb kp gain coeff', override_text=str(bicarb_kp_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_bicarb_dose_vol_control_ki_gain_coeff_override(self, bicarb_ki_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart bicarb ki coeff override command Constraints: Must be logged into DD. @param bicarb_ki_coeff: float - bicarb_ki_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ ki_gain_coeff = float_to_bytearray(bicarb_ki_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + ki_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_KI_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='bicarb ki gain coeff', override_text=str(bicarb_ki_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_acid_dose_vol_control_kp_gain_coeff_override(self, acid_kp_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart acid kp coeff override command Constraints: Must be logged into DD. @param acid_kp_coeff: float - acid_kp_coeff value to override acid kp coeff @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ kp_gain_coeff = float_to_bytearray(acid_kp_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + kp_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_KP_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='acid kp gain coeff', override_text=str(acid_kp_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_acid_dose_vol_control_ki_gain_coeff_override(self, acid_ki_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart acid ki coeff override command Constraints: Must be logged into DD. @param acid_ki_coeff: float - acid_ki_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ ki_gain_coeff = float_to_bytearray(acid_ki_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + ki_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_KI_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='acid ki gain coeff', override_text=str(acid_ki_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_target_conductivity_override(self, conductivity: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart target conductivity override command Constraints: Must be logged into DD. @param conductivity: float - onductivity value to override target conductivity @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ target_conductivity = float_to_bytearray(conductivity) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + target_conductivity return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_TARGET_CONDUCTIVITY_OVERRIDE_REQUEST, entity_name='target conductivity', override_text=str(conductivity), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_delta_conductivity_override(self, conductivity: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart target conductivity override command Constraints: Must be logged into DD. @param conductivity: float - conductivity value to override delta conductivity @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ delta_conductivity = float_to_bytearray(conductivity) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + delta_conductivity return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DELTA_CONDUCTIVITY_OVERRIDE_REQUEST, entity_name='delta conductivity', override_text=str(conductivity), logger=self.logger, can_interface=self.can_interface)