########################################################################### # # Copyright (c) 2022-2022 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file test_conductivity.py # # @author (last) Hung Nguyen # @date (last) 30-Mar-2022 # @author (original) Hung Nguyen # @date (original) 08-Mar-2022 # ############################################################################ import sys import time from datetime import datetime from dialin.dg.dialysate_generator import DG import os import csv directory = 'DG Cond' file_name = 'Test 1' notes = 'None' sampling_rate = 1 cwd = os.getcwd() if not os.path.isdir(cwd + f'/{directory}'): os.mkdir(cwd + f'/{directory}') current_time = datetime.now().strftime('%b-%d-%Y %H-%M-%S') result_file_name = f'{directory}/{file_name} - {current_time}.csv' dg = DG() if dg.cmd_log_in_to_dg() == 0: sys.exit('Unable to login into DG') dg.cmd_ui_request_dg_version() time.sleep(1) dg_fw_version = dg.get_version() dg_fpga_version = dg.get_fpga_version() def get_dg_mixing_data(): # Conductivity Cells cond_data = dg.conductivity_sensors.get_conductivity_sensors() cpi = cond_data[0] cpo = cond_data[1] cd1 = cond_data[2] cd2 = cond_data[3] cpi_raw = cond_data[4] cpo_raw = cond_data[5] cd1_raw = cond_data[6] cd2_raw = cond_data[7] # Temperature # dg.temperatures.cmd_temperatures_value_override(0, 38.0) tpi = dg.temperatures.get_temperatures_values(0) tpo = dg.temperatures.get_temperatures_values(2) td1 = dg.temperatures.get_temperatures_values(3) td2 = dg.temperatures.get_temperatures_values(4) # DG Op modes dg_mode = dg.get_operation_mode() dg_sub_mode = dg.get_operation_sub_mode() dg_sub_sub_mode = dg.gen_idle.get_sub_state() # log the gen idle bad fill sub-states # flows fmp = dg.ro_pump.get_measured_flow_rate() raw_fmp = dg.ro_pump.get_ro_pump_measured_raw_flow_rate_mlp() ro_pwm = dg.ro_pump.get_pwm_duty_cycle_pct() cp1_setpoint = dg.concentrate_pumps.concentrate_pump_cp1_current_set_speed cp1_measured = dg.concentrate_pumps.concentrate_pump_cp1_measured_speed cp2_setpoint = dg.concentrate_pumps.concentrate_pump_cp2_current_set_speed cp2_measured = dg.concentrate_pumps.concentrate_pump_cp2_measured_speed # pressure ppi = dg.pressures.ro_pump_inlet_pressure ppo = dg.pressures.ro_pump_outlet_pressure prd = dg.pressures.drain_pump_inlet_pressure pdr = dg.pressures.drain_pump_outlet_pressure # Load Cells lc_a1 = dg.load_cells.load_cell_A1 lc_a2 = dg.load_cells.load_cell_A2 lc_b1 = dg.load_cells.load_cell_B1 lc_b2 = dg.load_cells.load_cell_B2 # f'DG SubMode, {dg_sub_mode}, ' \ data_string = f'DG Mode, {dg_mode}, ' \ f'SubMode, {dg_sub_mode}, ' \ f'SubSubMode, {dg_sub_sub_mode}, ' \ f'CPi, {cpi:.4f}, ' \ f'CPo, {cpo:.4f}, ' \ f'CD1, {cd1:.4f}, ' \ f'CD2, {cd2:.4f}, ' \ f'CPi raw, {cpi_raw:.4f}, ' \ f'CPo raw, {cpo_raw:.4f}, ' \ f'CD1 raw, {cd1_raw:.4f}, ' \ f'CD2 raw, {cd2_raw:.4f}, ' \ f'TPi, {tpi:.4f}, ' \ f'TPo, {tpo:.4f}, ' \ f'TD1, {td1:.4f}, ' \ f'TD2, {td2:.4f}, ' \ f'RO_PWM, {ro_pwm:.4f}, ' \ f'FMP, {fmp:.4f}, ' \ f'Raw FMP, {raw_fmp:.4f}, ' \ f'CP1_setpoint, {cp1_setpoint}, ' \ f'CP1_measured_spd, {cp1_measured:.4f}, ' \ f'CP2_setpoint, {cp2_setpoint}, ' \ f'CP2_measured_spd, {cp2_measured}, ' \ f'PPi, {ppi:.4f}, ' \ f'PPo, {ppo:.4f}, ' \ f'PRd, {prd:.4f}, ' \ f'PDr, {pdr:.4f}, ' \ f'LCA1, {lc_a1:.4f}, ' \ f'LCA2, {lc_a2:.4f}, ' \ f'LCB1, {lc_b1:.4f}, ' \ f'LCB2, {lc_b2:.4f}' return data_string try: with open(result_file_name, 'w') as f: writer = csv.writer(f, delimiter=',') writer.writerows([[f'Date: {datetime.now()}'], [f'Sampling Rate: {sampling_rate} (s)'], [f'Notes: {notes}'], [f'DG FW version: {dg_fw_version}'], [f'DG FPGA version: {dg_fpga_version}'] ]) writer.writerow(['Data:']) # use this for a tag to find when data collection start using Pandas start_time = time.perf_counter() timer = 0 while True: time.sleep(sampling_rate) end_time = time.perf_counter() timer = end_time - start_time data = f'time, {timer:.4f}, ' + get_dg_mixing_data() writer.writerow(data.split(',')) print(data) except KeyboardInterrupt: print('end script') finally: dg.hd_proxy.cmd_start_stop_dg(0)