########################################################################### # # Copyright (c) 2025-2026 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file mixer_ctrl.py # # @author (last) Zoltan Miskolci # @date (last) 0406-June-2026 # @author (original) Sameer Poyil # @date (original) 06-Jun-2026 # ############################################################################ # Module imports from logging import Logger # Project imports from leahi_dialin.common.constants import NO_RESET , RESET from leahi_dialin.common.generic_defs import DataTypes from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override from dataclasses import dataclass from leahi_dialin.protocols.CAN import CanMessenger, CanChannels from leahi_dialin.utils.abstract_classes import AbstractSubSystem from leahi_dialin.utils.base import publish from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray from leahi_dialin.common.closed_loop_signals import ControllerSignals class DDDialysateCompositionClosedLoopMixing(AbstractSubSystem): """ DryBicart Dialysate Delivery (DD) Dialin API sub-class for Dry Bicart related commands. """ def __init__(self, can_interface: CanMessenger, logger: Logger): """ @param can_interface: Can Messenger object """ super().__init__() self.can_interface = can_interface self.logger = logger self.bcarb_ctlrl_signal = ControllerSignals() #: PI bicarb control signals self.acid_ctrl_signal = ControllerSignals() #: PI acid control signals if self.can_interface is not None: self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, message_id = MsgIds.MSG_ID_DD_DRY_BICART_DATA.value, function = self._handler_dry_bicart_sync) self.dd_dry_bicart_timestamp = 0 #: The timestamp of the last message self.dd_dry_bicart_bicarb_mix_vol = 0.0 self.dd_dry_bicart_acid_mix_vol = 0.0 #TODO remove after final testing self.bcarb_ctlrl_signal.control_signal_reference = 0.0 self.bcarb_ctlrl_signal.control_signal_measured = 0.0 self.bcarb_ctlrl_signal.control_signal_error = 0.0 self.bcarb_ctlrl_signal.control_signal_error_sum = 0.0 self.bcarb_ctlrl_signal.control_signal_error_sum_after_windup = 0.0 self.bcarb_ctlrl_signal.control_signal_proportional_output = 0.0 self.bcarb_ctlrl_signal.control_signal_integral_output = 0.0 self.bcarb_ctlrl_signal.control_signal_feed_forward_output = 0.0 self.bcarb_ctlrl_signal.control_singal_control = 0.0 #TODO remove after final testing self.acid_ctrl_signal.control_signal_reference = 0.0 self.acid_ctrl_signal.control_signal_measured = 0.0 self.acid_ctrl_signal.control_signal_error = 0.0 self.acid_ctrl_signal.control_signal_error_sum = 0.0 self.acid_ctrl_signal.control_signal_error_sum_after_windup = 0.0 self.acid_ctrl_signal.control_signal_proportional_output = 0.0 self.acid_ctrl_signal.control_signal_integral_output = 0.0 self.acid_ctrl_signal.control_signal_feed_forward_output = 0.0 self.acid_ctrl_signal.control_singal_control = 0.0 #TODO remove after final testing self.dd_dry_bicart_bicarb_kp_gain = 0.0 self.dd_dry_bicart_bicarb_ki_gain = 0.0 #TODO remove after final testing self.dd_dry_bicart_acid_kp_gain = 0.0 self.dd_dry_bicart_acid_ki_gain = 0.0 @publish(["msg_id_dd_dry_bicart_data", "bicarb_dose_vol", "acid_dose_vol", "bicarb_reference", "bicarb_measured", "bicarb_error ", "bicarb_error_sum", "bicarb_error_sum_after_windup", "bicarb_proportional_output", "bicarb_integral_output", "bicarb_feed_forward_output", "bicarb_control", "bicarb_kp_gain", "bicarb_ki_gain", "dd_dry_bicart_timestamp"]) def _handler_dry_bicart_sync(self, message, timestamp=0.0): """ Handles published dry bicart data messages. @param message: published dry bicart data message @return: None """ msg_list = [] msg_list.append(('self.dd_dry_bicart_bicarb_mix_vol', DataTypes.F32)) msg_list.append(('self.dd_dry_bicart_acid_mix_vol', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_reference', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_measured', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error_sum', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error_sum_after_windup', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_proportional_output', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_integral_output ', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_signal_feed_forward_output', DataTypes.F32)) msg_list.append(('self.bcarb_ctlrl_signal.control_singal_control', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_reference', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_measured', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_error', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_error_sum', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_error_sum_after_windup', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_proportional_output', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_integral_output ', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_signal_feed_forward_output', DataTypes.F32)) msg_list.append(('self.acid_ctrl_signal.control_singal_control', DataTypes.F32)) msg_list.append(('self.dd_dry_bicart_bicarb_kp_gain', DataTypes.F32)) msg_list.append(('self.dd_dry_bicart_bicarb_ki_gain', DataTypes.F32)) msg_list.append(('self.dd_dry_bicart_acid_kp_gain', DataTypes.F32)) msg_list.append(('self.dd_dry_bicart_acid_ki_gain', DataTypes.F32)) self.process_into_vars(decoder_list = msg_list, message = message) self.dd_dry_bicart_timestamp = timestamp def cmd_dry_bicart_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart data broadcast interval override command Constraints: Must be logged into DD. Given interval must be non-zero and a multiple of the DD general task interval (50 ms). @param ms: integer - interval (in ms) to override with @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ return cmd_generic_broadcast_interval_override( ms=ms, reset=reset, channel_id=CanChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_DRY_BICART_DATA_PUBLISH_INTERVAL_OVERRIDE_REQUEST, module_name='DD Dry Bicart', logger=self.logger, can_interface=self.can_interface) def cmd_bicart_bicarb_dose_vol_control_override(self, bicarb_dose_vol: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart bicarb dose vol override command Constraints: Must be logged into DD. @param bicarb_dose_vol: float - bicarb_dose_vol value to override bicarb dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ bicarb_dose = float_to_bytearray(bicarb_dose_vol) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + bicarb_dose return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_OVERRIDE_REQUEST, entity_name='bicarb dose vol', override_text=str(bicarb_dose_vol), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_acid_dose_vol_control_override(self, acid_dose_vol: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart acid dose vol override command Constraints: Must be logged into DD. @param acid_dose_vol: float - acid_dose_vol value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ acid_dose = float_to_bytearray(acid_dose_vol) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + acid_dose return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_OVERRIDE_REQUEST, entity_name='acid dose vol', override_text=str(acid_dose_vol), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_bicarb_dose_vol_control_kp_gain_coeff_override(self, bicarb_kp_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart bicarb kp coeff override command Constraints: Must be logged into DD. @param bicarb_kp_coeff: float - bicarb_kp_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ kp_gain_coeff = float_to_bytearray(bicarb_kp_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + kp_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_KP_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='bicarb kp gain coeff', override_text=str(bicarb_kp_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_bicarb_dose_vol_control_ki_gain_coeff_override(self, bicarb_ki_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart bicarb ki coeff override command Constraints: Must be logged into DD. @param bicarb_ki_coeff: float - bicarb_ki_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ ki_gain_coeff = float_to_bytearray(bicarb_ki_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + ki_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_KI_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='bicarb ki gain coeff', override_text=str(bicarb_ki_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_acid_dose_vol_control_kp_gain_coeff_override(self, acid_kp_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart acid kp coeff override command Constraints: Must be logged into DD. @param acid_kp_coeff: float - acid_kp_coeff value to override acid kp coeff @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ kp_gain_coeff = float_to_bytearray(acid_kp_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + kp_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_KP_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='acid kp gain coeff', override_text=str(acid_kp_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_acid_dose_vol_control_ki_gain_coeff_override(self, acid_ki_coeff: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart acid ki coeff override command Constraints: Must be logged into DD. @param acid_ki_coeff: float - acid_ki_coeff value to override acid dose vol @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ ki_gain_coeff = float_to_bytearray(acid_ki_coeff) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + ki_gain_coeff return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_KI_GAIN_COEFF_OVERRIDE_REQUEST, entity_name='acid ki gain coeff', override_text=str(acid_ki_coeff), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_target_conductivity_override(self, conductivity: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart target conductivity override command Constraints: Must be logged into DD. @param conductivity: float - conductivity value to override target conductivity @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ target_conductivity = float_to_bytearray(conductivity) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + target_conductivity return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_TARGET_CONDUCTIVITY_OVERRIDE_REQUEST, entity_name='target conductivity', override_text=str(conductivity), logger=self.logger, can_interface=self.can_interface) def cmd_bicart_delta_conductivity_override(self, conductivity: float, reset: int = NO_RESET) -> int: """ Constructs and sends the dry bicart target conductivity override command Constraints: Must be logged into DD. @param conductivity: float - bicarb delta conductivity value to override delta conductivity @param reset: integer - 1 to reset a previous override, 0 to override @return: 1 if successful, zero otherwise """ delta_conductivity = float_to_bytearray(conductivity) reset_byte_array = integer_to_bytearray(reset) payload = reset_byte_array + delta_conductivity return cmd_generic_override( payload=payload, reset=reset, channel_id=DenaliChannels.dialin_to_dd_ch_id, msg_id=MsgIds.MSG_ID_DD_BICARB_DELTA_CONDUCTIVITY_OVERRIDE_REQUEST, entity_name='delta conductivity', override_text=str(conductivity), logger=self.logger, can_interface=self.can_interface)