########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file test_uf.py # # @author (last) Sean Nash # @date (last) 24-Sep-2022 # @author (original) Peter Lucia # @date (original) 14-Jul-2020 # ############################################################################ import sys sys.path.append("..") import threading from time import sleep from tests.data_capture_helpers.fp_helpers import fp_float_controller, fp_pressure_recorder, fp_temperature_recorder, \ fp_level_recorder, fp_flow_recorder, fp_conductivity_recorder, fp_valve_recorder, fp_ro_pump_recorder, fp_boost_pump_recorder, \ fp_op_mode_recorder, fp_combine_logs, fp_log_setup from leahi_dialin.fp.filtration_purification import FP from leahi_dialin.dd.dialysate_delivery import DD RECORDING_INTERVAL = 0.25 RECORDING_INTERVAL_IN_MS = int(RECORDING_INTERVAL * 1000) CONTROL_INTERVAL = 0.5 def set_broadcast_overrides(fp:FP, interval = RECORDING_INTERVAL_IN_MS): fp.boost_pump.cmd_boost_pump_broadcast_interval_override(interval) fp.conductivity.cmd_conductivity_sensor_data_broadcast_interval_override(interval) fp.flows.cmd_flow_sensor_broadcast_interval_override(interval) fp.levels.cmd_levels_broadcast_interval_override(interval) fp.pressures.cmd_pressure_sensors_broadcast_interval_override(interval) fp.ro_pump.cmd_ro_pump_broadcast_interval_override(interval) fp.temperatures.cmd_temperatures_data_broadcast_interval_override(interval) fp.valves.cmd_valve_broadcast_interval_override(interval) fp.cmd_op_mode_broadcast_interval_override(interval) def reset_broadcast_overrides(fp:FP): fp.boost_pump.cmd_boost_pump_broadcast_interval_override(0, 1) fp.conductivity.cmd_conductivity_sensor_data_broadcast_interval_override(0, 1) fp.flows.cmd_flow_sensor_broadcast_interval_override(0, 1) fp.levels.cmd_levels_broadcast_interval_override(0, 1) fp.pressures.cmd_pressure_sensors_broadcast_interval_override(0, 1) fp.ro_pump.cmd_ro_pump_broadcast_interval_override(0, 1) fp.temperatures.cmd_temperatures_data_broadcast_interval_override(0, 1) fp.valves.cmd_valve_broadcast_interval_override(0, 1) fp.cmd_op_mode_broadcast_interval_override(0,1) if __name__ == "__main__": try: # create a FP object called fp fp = FP(log_level="DEBUG") dd = DD() # log in to fp as a tester if fp.cmd_log_in_to_fp() == 0: print ("Failed to log into FP") exit(1) print( "Sending broadcast overrides.. ") set_broadcast_overrides(fp) fp_log_setup() print ("Starting threads...") #float_control_thread = threading.Thread(target=fp_float_controller, args=(fp, CONTROL_INTERVAL),daemon=True) #float_control_thread.start() pressure_log_writer_thread = threading.Thread(target=fp_pressure_recorder, args=(fp, RECORDING_INTERVAL), daemon=True) temp_log_writer_thread = threading.Thread(target=fp_temperature_recorder, args=(fp, RECORDING_INTERVAL), daemon=True) level_log_writer_thread = threading.Thread(target=fp_level_recorder, args=(fp, RECORDING_INTERVAL), daemon=True) flows_log_writer_thread = threading.Thread(target=fp_flow_recorder, args=(fp, RECORDING_INTERVAL), daemon=True) conductivity_log_writer_thread = threading.Thread(target=fp_conductivity_recorder, args=(fp, RECORDING_INTERVAL), daemon=True) valve_log_writer_thread = threading.Thread(target=fp_valve_recorder, args=(fp, dd, RECORDING_INTERVAL), daemon=True) ro_log_writer_thread = threading.Thread(target=fp_ro_pump_recorder, args=(fp, RECORDING_INTERVAL), daemon=True) boost_log_writer_thread = threading.Thread(target=fp_boost_pump_recorder, args=(fp, RECORDING_INTERVAL), daemon=True) opmode_log_writer_thread = threading.Thread(target=fp_op_mode_recorder, args=(fp, RECORDING_INTERVAL), daemon=True) pressure_log_writer_thread.start() temp_log_writer_thread.start() level_log_writer_thread.start() flows_log_writer_thread.start() conductivity_log_writer_thread.start() valve_log_writer_thread.start() ro_log_writer_thread.start() boost_log_writer_thread.start() opmode_log_writer_thread.start() while True: sleep(1) except KeyboardInterrupt: print("Stopping.") reset_broadcast_overrides(fp) finally: print("Creating log...") fp_combine_logs() print("Exiting") exit(0)