Index: leahi_dialin/common/closed_loop_signals.py =================================================================== diff -u --- leahi_dialin/common/closed_loop_signals.py (revision 0) +++ leahi_dialin/common/closed_loop_signals.py (revision 3c8e7ea66c42cb56094aab65ad46d1a7b75ec904) @@ -0,0 +1,15 @@ +from dataclasses import dataclass + +@dataclass +class ControllerSignals: + """Equivalent to PI_CONTROLLER_SIGNALS_DATA""" + control_signal_reference: float = 0.0 # ///< Reference value + control_signal_measured: float = 0.0 # ///< Measured value + control_signal_error: float = 0.0 # ///< Error value + control_signal_error_sum: float = 0.0 # ///< Error sum before anti-windup + control_signal_error_sum_after_windup: float = 0.0 # ///< Error sum after anti-windup + control_signal_proportional_output: float = 0.0 # ///< P portion + control_signal_integral_output: float = 0.0 # ///< I portion + control_signal_feed_forward_output: float = 0.0 # ///< Feed forward portion + control_singal_control: float = 0.0 # ///< Controller output bcarb_ctlrl_signal + Index: leahi_dialin/dd/modules/mixing_cntrl.py =================================================================== diff -u --- leahi_dialin/dd/modules/mixing_cntrl.py (revision 0) +++ leahi_dialin/dd/modules/mixing_cntrl.py (revision 3c8e7ea66c42cb56094aab65ad46d1a7b75ec904) @@ -0,0 +1,353 @@ +########################################################################### +# +# Copyright (c) 2025-2026 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file mixer_ctrl.py +# +# @author (last) Zoltan Miskolci +# @date (last) 0406-June-2026 +# @author (original) Sameer Poyil +# @date (original) 06-Jun-2026 +# +############################################################################ + +# Module imports +from logging import Logger + +# Project imports +from leahi_dialin.common.constants import NO_RESET , RESET +from leahi_dialin.common.generic_defs import DataTypes +from leahi_dialin.common.msg_defs import MsgIds, MsgFieldPositions +from leahi_dialin.common.override_templates import cmd_generic_broadcast_interval_override, cmd_generic_override +from dataclasses import dataclass +from leahi_dialin.protocols.CAN import CanMessenger, CanChannels +from leahi_dialin.utils.abstract_classes import AbstractSubSystem +from leahi_dialin.utils.base import publish +from leahi_dialin.utils.conversions import integer_to_bytearray, float_to_bytearray +from leahi_dialin.common.closed_loop_signals import ControllerSignals + +class DDDialysateCompositionClosedLoopMixing(AbstractSubSystem): + """ + DryBicart + + Dialysate Delivery (DD) Dialin API sub-class for Dry Bicart related commands. + """ + + def __init__(self, can_interface: CanMessenger, logger: Logger): + """ + + @param can_interface: Can Messenger object + """ + super().__init__() + + self.can_interface = can_interface + self.logger = logger + self.bcarb_ctlrl_signal = ControllerSignals() #: PI bicarb control signals + self.acid_ctrl_signal = ControllerSignals() #: PI acid control signals + + if self.can_interface is not None: + self.can_interface.register_receiving_publication_function(channel_id = CanChannels.dd_sync_broadcast_ch_id, + message_id = MsgIds.MSG_ID_DD_DRY_BICART_DATA.value, + function = self._handler_dry_bicart_sync) + + self.dd_dry_bicart_timestamp = 0 #: The timestamp of the last message + + self.dd_dry_bicart_bicarb_mix_vol = 0.0 + self.dd_dry_bicart_acid_mix_vol = 0.0 + + #TODO remove after final testing + self.bcarb_ctlrl_signal.control_signal_reference = 0.0 + self.bcarb_ctlrl_signal.control_signal_measured = 0.0 + self.bcarb_ctlrl_signal.control_signal_error = 0.0 + self.bcarb_ctlrl_signal.control_signal_error_sum = 0.0 + self.bcarb_ctlrl_signal.control_signal_error_sum_after_windup = 0.0 + self.bcarb_ctlrl_signal.control_signal_proportional_output = 0.0 + self.bcarb_ctlrl_signal.control_signal_integral_output = 0.0 + self.bcarb_ctlrl_signal.control_signal_feed_forward_output = 0.0 + self.bcarb_ctlrl_signal.control_singal_control = 0.0 + + #TODO remove after final testing + self.acid_ctrl_signal.control_signal_reference = 0.0 + self.acid_ctrl_signal.control_signal_measured = 0.0 + self.acid_ctrl_signal.control_signal_error = 0.0 + self.acid_ctrl_signal.control_signal_error_sum = 0.0 + self.acid_ctrl_signal.control_signal_error_sum_after_windup = 0.0 + self.acid_ctrl_signal.control_signal_proportional_output = 0.0 + self.acid_ctrl_signal.control_signal_integral_output = 0.0 + self.acid_ctrl_signal.control_signal_feed_forward_output = 0.0 + self.acid_ctrl_signal.control_singal_control = 0.0 + + #TODO remove after final testing + self.dd_dry_bicart_bicarb_kp_gain = 0.0 + self.dd_dry_bicart_bicarb_ki_gain = 0.0 + + #TODO remove after final testing + self.dd_dry_bicart_acid_kp_gain = 0.0 + self.dd_dry_bicart_acid_ki_gain = 0.0 + + + @publish(["msg_id_dd_dry_bicart_data", + "bicarb_dose_vol", "acid_dose_vol", + "bicarb_reference", "bicarb_measured", "bicarb_error ", "bicarb_error_sum", "bicarb_error_sum_after_windup", "bicarb_proportional_output", + "bicarb_integral_output", "bicarb_feed_forward_output", "bicarb_control", + "bicarb_kp_gain", "bicarb_ki_gain", + "dd_dry_bicart_timestamp"]) + + def _handler_dry_bicart_sync(self, message, timestamp=0.0): + """ + Handles published dry bicart data messages. + + @param message: published dry bicart data message + @return: None + """ + msg_list = [] + + msg_list.append(('self.dd_dry_bicart_bicarb_mix_vol', DataTypes.F32)) + msg_list.append(('self.dd_dry_bicart_acid_mix_vol', DataTypes.F32)) + + msg_list.append(('self.bcarb_ctlrl_signal.control_signal_reference', DataTypes.F32)) + msg_list.append(('self.bcarb_ctlrl_signal.control_signal_measured', DataTypes.F32)) + msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error', DataTypes.F32)) + msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error_sum', DataTypes.F32)) + msg_list.append(('self.bcarb_ctlrl_signal.control_signal_error_sum_after_windup', DataTypes.F32)) + msg_list.append(('self.bcarb_ctlrl_signal.control_signal_proportional_output', DataTypes.F32)) + msg_list.append(('self.bcarb_ctlrl_signal.control_signal_integral_output ', DataTypes.F32)) + msg_list.append(('self.bcarb_ctlrl_signal.control_signal_feed_forward_output', DataTypes.F32)) + msg_list.append(('self.bcarb_ctlrl_signal.control_singal_control', DataTypes.F32)) + + msg_list.append(('self.acid_ctrl_signal.control_signal_reference', DataTypes.F32)) + msg_list.append(('self.acid_ctrl_signal.control_signal_measured', DataTypes.F32)) + msg_list.append(('self.acid_ctrl_signal.control_signal_error', DataTypes.F32)) + msg_list.append(('self.acid_ctrl_signal.control_signal_error_sum', DataTypes.F32)) + msg_list.append(('self.acid_ctrl_signal.control_signal_error_sum_after_windup', DataTypes.F32)) + msg_list.append(('self.acid_ctrl_signal.control_signal_proportional_output', DataTypes.F32)) + msg_list.append(('self.acid_ctrl_signal.control_signal_integral_output ', DataTypes.F32)) + msg_list.append(('self.acid_ctrl_signal.control_signal_feed_forward_output', DataTypes.F32)) + msg_list.append(('self.acid_ctrl_signal.control_singal_control', DataTypes.F32)) + + msg_list.append(('self.dd_dry_bicart_bicarb_kp_gain', DataTypes.F32)) + msg_list.append(('self.dd_dry_bicart_bicarb_ki_gain', DataTypes.F32)) + + msg_list.append(('self.dd_dry_bicart_acid_kp_gain', DataTypes.F32)) + msg_list.append(('self.dd_dry_bicart_acid_ki_gain', DataTypes.F32)) + + self.process_into_vars(decoder_list = msg_list, + message = message) + + self.dd_dry_bicart_timestamp = timestamp + + def cmd_dry_bicart_broadcast_interval_override(self, ms: int, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart data broadcast interval override command + Constraints: + Must be logged into DD. + Given interval must be non-zero and a multiple of the DD general task interval (50 ms). + + @param ms: integer - interval (in ms) to override with + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + return cmd_generic_broadcast_interval_override( + ms=ms, + reset=reset, + channel_id=CanChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_DRY_BICART_DATA_PUBLISH_INTERVAL_OVERRIDE_REQUEST, + module_name='DD Dry Bicart', + logger=self.logger, + can_interface=self.can_interface) + + + def cmd_bicart_bicarb_dose_vol_control_override(self, bicarb_dose_vol: float, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart bicarb dose vol override command + Constraints: + Must be logged into DD. + + @param bicarb_dose_vol: float - bicarb_dose_vol value to override bicarb dose vol + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + bicarb_dose = float_to_bytearray(bicarb_dose_vol) + reset_byte_array = integer_to_bytearray(reset) + payload = reset_byte_array + bicarb_dose + + return cmd_generic_override( + payload=payload, + reset=reset, + channel_id=DenaliChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_OVERRIDE_REQUEST, + entity_name='bicarb dose vol', + override_text=str(bicarb_dose_vol), + logger=self.logger, + can_interface=self.can_interface) + + def cmd_bicart_acid_dose_vol_control_override(self, acid_dose_vol: float, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart acid dose vol override command + Constraints: + Must be logged into DD. + + @param acid_dose_vol: float - acid_dose_vol value to override acid dose vol + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + acid_dose = float_to_bytearray(acid_dose_vol) + reset_byte_array = integer_to_bytearray(reset) + payload = reset_byte_array + acid_dose + + return cmd_generic_override( + payload=payload, + reset=reset, + channel_id=DenaliChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_OVERRIDE_REQUEST, + entity_name='acid dose vol', + override_text=str(acid_dose_vol), + logger=self.logger, + can_interface=self.can_interface) + + def cmd_bicart_bicarb_dose_vol_control_kp_gain_coeff_override(self, bicarb_kp_coeff: float, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart bicarb kp coeff override command + Constraints: + Must be logged into DD. + + @param bicarb_kp_coeff: float - bicarb_kp_coeff value to override acid dose vol + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + kp_gain_coeff = float_to_bytearray(bicarb_kp_coeff) + reset_byte_array = integer_to_bytearray(reset) + payload = reset_byte_array + kp_gain_coeff + + return cmd_generic_override( + payload=payload, + reset=reset, + channel_id=DenaliChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_KP_GAIN_COEFF_OVERRIDE_REQUEST, + entity_name='bicarb kp gain coeff', + override_text=str(bicarb_kp_coeff), + logger=self.logger, + can_interface=self.can_interface) + + def cmd_bicart_bicarb_dose_vol_control_ki_gain_coeff_override(self, bicarb_ki_coeff: float, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart bicarb ki coeff override command + Constraints: + Must be logged into DD. + + @param bicarb_ki_coeff: float - bicarb_ki_coeff value to override acid dose vol + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + ki_gain_coeff = float_to_bytearray(bicarb_ki_coeff) + reset_byte_array = integer_to_bytearray(reset) + payload = reset_byte_array + ki_gain_coeff + + return cmd_generic_override( + payload=payload, + reset=reset, + channel_id=DenaliChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_BICARB_DOSE_VOL_CONTROL_KI_GAIN_COEFF_OVERRIDE_REQUEST, + entity_name='bicarb ki gain coeff', + override_text=str(bicarb_ki_coeff), + logger=self.logger, + can_interface=self.can_interface) + + def cmd_bicart_acid_dose_vol_control_kp_gain_coeff_override(self, acid_kp_coeff: float, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart acid kp coeff override command + Constraints: + Must be logged into DD. + + @param acid_kp_coeff: float - acid_kp_coeff value to override acid kp coeff + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + kp_gain_coeff = float_to_bytearray(acid_kp_coeff) + reset_byte_array = integer_to_bytearray(reset) + payload = reset_byte_array + kp_gain_coeff + + return cmd_generic_override( + payload=payload, + reset=reset, + channel_id=DenaliChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_KP_GAIN_COEFF_OVERRIDE_REQUEST, + entity_name='acid kp gain coeff', + override_text=str(acid_kp_coeff), + logger=self.logger, + can_interface=self.can_interface) + + def cmd_bicart_acid_dose_vol_control_ki_gain_coeff_override(self, acid_ki_coeff: float, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart acid ki coeff override command + Constraints: + Must be logged into DD. + + @param acid_ki_coeff: float - acid_ki_coeff value to override acid dose vol + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + ki_gain_coeff = float_to_bytearray(acid_ki_coeff) + reset_byte_array = integer_to_bytearray(reset) + payload = reset_byte_array + ki_gain_coeff + + return cmd_generic_override( + payload=payload, + reset=reset, + channel_id=DenaliChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_ACID_DOSE_VOL_CONTROL_KI_GAIN_COEFF_OVERRIDE_REQUEST, + entity_name='acid ki gain coeff', + override_text=str(acid_ki_coeff), + logger=self.logger, + can_interface=self.can_interface) + + def cmd_bicart_target_conductivity_override(self, conductivity: float, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart target conductivity override command + Constraints: + Must be logged into DD. + + @param conductivity: float - conductivity value to override target conductivity + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + target_conductivity = float_to_bytearray(conductivity) + reset_byte_array = integer_to_bytearray(reset) + payload = reset_byte_array + target_conductivity + + return cmd_generic_override( + payload=payload, + reset=reset, + channel_id=DenaliChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_BICARB_TARGET_CONDUCTIVITY_OVERRIDE_REQUEST, + entity_name='target conductivity', + override_text=str(conductivity), + logger=self.logger, + can_interface=self.can_interface) + + def cmd_bicart_delta_conductivity_override(self, conductivity: float, reset: int = NO_RESET) -> int: + """ + Constructs and sends the dry bicart target conductivity override command + Constraints: + Must be logged into DD. + + @param conductivity: float - bicarb delta conductivity value to override delta conductivity + @param reset: integer - 1 to reset a previous override, 0 to override + @return: 1 if successful, zero otherwise + """ + delta_conductivity = float_to_bytearray(conductivity) + reset_byte_array = integer_to_bytearray(reset) + payload = reset_byte_array + delta_conductivity + + return cmd_generic_override( + payload=payload, + reset=reset, + channel_id=DenaliChannels.dialin_to_dd_ch_id, + msg_id=MsgIds.MSG_ID_DD_BICARB_DELTA_CONDUCTIVITY_OVERRIDE_REQUEST, + entity_name='delta conductivity', + override_text=str(conductivity), + logger=self.logger, + can_interface=self.can_interface)