########################################################################### # # Copyright (c) 2025-2026 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file mixing_ctrl.py # # @author (last) Zoltan Miskolci # @date (last) 06-June-2026 # @author (original) Sameer Poyil # @date (original) 06-Jun-2026 # ############################################################################ # Module imports from logging import Logger # Project imports from leahi_dialin.common.constants import NO_RESET from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from leahi_dialin.protocols.CAN import CanMessenger, CanChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray from leahi_dialin.common.dataclass.closed_loop_signals import ControllerSignals class DDDialysateMixing(AbstractSubSystem): """ DialysateMixing Dialysate Delivery (DD) Dialin API sub-class for Dialysate Mixing Control related commands. """ def __init__(self, can_interface: CanMessenger, logger: Logger): """ @param can_interface: Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self.bcarb_ctlrl_signal = ControllerSignals() #: PI bicarb control signals self.acid_ctrl_signal = ControllerSignals() #: PI acid control signals if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_MIXING_CONTROL_DATA.value, function = self._handler_mixing_control_sync) self.bicarb_mixing_state = 0 self.bicarb_control_interval = 0 self.acid_mixing_state = 0 self.acid_control_interval = 0 self.current_bicarb_mix_volume = 0.0 self.last_bicarb_mix_volume = 0.0 self.current_acid_mix_volume = 0.0 self.last_acid_mix_volume = 0.0 #TODO remove after final testing self.bcarb_ctlrl_signal.control_signal_reference = 0.0 self.bcarb_ctlrl_signal.control_signal_measured = 0.0 self.bcarb_ctlrl_signal.control_signal_error = 0.0 self.bcarb_ctlrl_signal.control_signal_error_sum = 0.0 self.bcarb_ctlrl_signal.control_signal_error_sum_after_windup = 0.0 self.bcarb_ctlrl_signal.control_signal_proportional_output = 0.0 self.bcarb_ctlrl_signal.control_signal_integral_output = 0.0 self.bcarb_ctlrl_signal.control_signal_feed_forward_output = 0.0 self.bcarb_ctlrl_signal.control_singal_control = 0.0 #TODO remove after final testing self.acid_ctrl_signal.control_signal_reference = 0.0 self.acid_ctrl_signal.control_signal_measured = 0.0 self.acid_ctrl_signal.control_signal_error = 0.0 self.acid_ctrl_signal.control_signal_error_sum = 0.0 self.acid_ctrl_signal.control_signal_error_sum_after_windup = 0.0 self.acid_ctrl_signal.control_signal_proportional_output = 0.0 self.acid_ctrl_signal.control_signal_integral_output = 0.0 self.acid_ctrl_signal.control_signal_feed_forward_output = 0.0 self.acid_ctrl_signal.control_singal_control = 0.0 #TODO remove after final testing self.dd_bicarb_kp_gain = 0.0 self.dd_bicarb_ki_gain = 0.0 #TODO remove after final testing self.dd_acid_kp_gain = 0.0 self.dd_acid_ki_gain = 0.0 self.dd_mixing_control_timestamp = 0 #: The timestamp of the last message @publish(["msg_id_dd_mixing_control_data", "bicarb_mixing_state", "bicarb_control_interval", "acid_mixing_state", "acid_control_interval", "bicarb_reference", "bicarb_measured", "bicarb_error ", "bicarb_error_sum", "bicarb_error_sum_after_windup", "bicarb_proportional_output", "bicarb_integral_output", "bicarb_feed_forward_output", "bicarb_control", "acid_reference", "acid_measured", "acid_error ", "acid_error_sum", "acid_error_sum_after_windup", "acid_proportional_output", "acid_integral_output", "acid_feed_forward_output", "acid_control", "bicarb_kp_gain", "bicarb_ki_gain","acid_kp_gain", "acid_ki_gain", "dd_mixing_control_timestamp"]) def _handler_mixing_control_sync(self, message, timestamp=0.0): """ Handles published mixing control data messages. @param message: published mixing control data message @return: None """ msg_list = [] msg_list.append(('self.bicarb_mixing_state', DataTypes.U32)) msg_list.append(('self.bicarb_control_interval', DataTypes.U32)) msg_list.append(('self.acid_mixing_state', DataTypes.U32)) msg_list.append(('self.acid_control_interval', DataTypes.U32)) msg_list.append(('self.current_bicarb_mix_volume', DataTypes.F32)) msg_list.append(('self.last_bicarb_mix_volume', DataTypes.F32)) msg_list.append(('self.current_acid_mix_volume', DataTypes.F32)) msg_list.append(('self.last_acid_mix_volume', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_reference', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_measured', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error_sum', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error_sum_after_windup', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_proportional_output', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_integral_output ', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_feed_forward_output', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_singal_control', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_reference', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_measured', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_error', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_error_sum', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_error_sum_after_windup', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_proportional_output', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_integral_output ', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_feed_forward_output', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_singal_control', DataTypes.F32)) msg_list.append(('self.dd_bicarb_kp_gain', DataTypes.F32)) msg_list.append(('self.dd_bicarb_ki_gain', DataTypes.F32)) msg_list.append(('self.dd_acid_kp_gain', DataTypes.F32)) msg_list.append(('self.dd_acid_ki_gain', DataTypes.F32)) self.process_into_vars(decoder_list = msg_list, message = message) self.dd_mixing_control_timestamp = timestamp def cmd_dialysate_mixing_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the mixing control data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ return cmd_generic_broadcast_interval_override( ms=ms, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_MIXING_CONTROL_DATA_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name='DD Mixing Control', logger=self.logger, can_interface=self.can_interface) def cmd_mixing_control_bicarb_mix_vol_override(self, bicarb_mix_vol: float, reset: int = NO_RESET) -> int: """ Constructs and sends the bicarb mix vol override command Constraints: Must be logged into DD. @param bicarb_mix_vol: float - bicarb_mix_vol value to override bicarb mix vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ bicarb_mix = float_to_bytearray(bicarb_mix_vol) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + bicarb_mix return cmd_generic_override( payload=payload, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_MIX_VOL_OVERRIDE_REQUEST, entity_name='DD Bicarb Mix Vol', override_text=str(bicarb_dose_vol), logger=self.logger, can_interface=self.can_interface) def cmd_mixing_control_acid_mix_vol_override(self, acid_mix_vol: float, reset: int = NO_RESET) -> int: """ Constructs and sends the mixing control acid mix vol override command Constraints: Must be logged into DD. @param acid_mix_vol: float - acid_mix_vol value to override acid mix vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ acid_mix = float_to_bytearray(acid_mix_vol) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + acid_mix return cmd_generic_override( payload=payload, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_MIX_VOL_OVERRIDE_REQUEST, entity_name='DD Acid Mix Vol', override_text=str(acid_mix_vol), logger=self.logger, can_interface=self.can_interface) def cmd_mixing_control_bicarb_mix_vol_kp_gain_coeff_override(self, bicarb_kp_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the mixing control bicarb kp coeff override command Constraints: Must be logged into DD. @param bicarb_kp_coeff: float - bicarb_kp_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ kp_gain_coeff = float_to_bytearray(bicarb_kp_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + kp_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_MIX_VOL_KP_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='DD Bicarb KP Gain Coeff', override_text=str(bicarb_kp_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_mixing_control_bicarb_mix_vol_ki_gain_coeff_override(self, bicarb_ki_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the mixing control bicarb ki coeff override command Constraints: Must be logged into DD. @param bicarb_ki_coeff: float - bicarb_ki_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ ki_gain_coeff = float_to_bytearray(bicarb_ki_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + ki_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_MIX_VOL_KI_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='DD Bicarb KI Gain Coeff', override_text=str(bicarb_ki_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_mixing_control_acid_mix_vol_control_kp_gain_coeff_override(self, acid_kp_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the Mixing Control acid kp coeff override command Constraints: Must be logged into DD. @param acid_kp_coeff: float - acid_kp_coeff value to override acid kp coeff @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ kp_gain_coeff = float_to_bytearray(acid_kp_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + kp_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_MIX_VOL_KP_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='DD Acid KP Gain Coeff', override_text=str(acid_kp_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_mixing_control_acid_mix_vol_control_ki_gain_coeff_override(self, acid_ki_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the Mixing Control acid ki coeff override command Constraints: Must be logged into DD. @param acid_ki_coeff: float - acid_ki_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ ki_gain_coeff = float_to_bytearray(acid_ki_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + ki_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_MIX_VOL_KI_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='DD Acid KI Gain Coeff', override_text=str(acid_ki_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_mixing_control_bicarb_target_conductivity_override(self, bicarb_conductivity: float, reset: int = NO_RESET) -> int: """ Constructs and sends the Mixing Control bicarb target conductivity override command Constraints: Must be logged into DD. @param bicarb_conductivity: float - conductivity value to override target conductivity @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ target_conductivity = float_to_bytearray(bicarb_conductivity) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + target_conductivity return cmd_generic_override( payload=payload, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_TARGET_CONDUCTIVITY_OVERRIDE_REQUEST, entity_name='DD Bicarb Target Conductivity', override_text=str(bicarb_conductivity), logger=self.logger, can_interface=self.can_interface) def cmd_mixing_control_bicarb_delta_conductivity_override(self, bicarb_delta_conductivity: float, reset: int = NO_RESET) -> int: """ Constructs and sends the Mixing Control bicart delta conductivity override command Constraints: Must be logged into DD. @param bicarb_delta_conductivity: float - bicarb delta conductivity value to override delta conductivity @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ delta_conductivity = float_to_bytearray(bicarb_delta_conductivity) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + delta_conductivity return cmd_generic_override( payload=payload, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DELTA_CONDUCTIVITY_OVERRIDE_REQUEST, entity_name='DD Bicarb Delta Conductivity', override_text=str(bicarb_delta_conductivity), logger=self.logger, can_interface=self.can_interface)