########################################################################### # # Copyright (c) 2022-2022 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file dg_tests.py # # @author (last) Micahel Garthwaite # @date (last) 06-Oct-2022 # @author (original) Dara Navaei # @date (original) 16-Jan-2022 # ############################################################################ from dialin.dg.dialysate_generator import DG from dialin.hd.hemodialysis_device import HD from dialin.ui.hd_simulator import HDSimulator from dialin.common.dg_defs import DGHeatDisinfectStates, DGHeatDisinfectUIStates from dialin.dg.heat_disinfect import HeatCancellationModes from dialin.common.dg_defs import DGChemicalDisinfectStates, DGChemDisinfectUIStates from dialin.dg.chemical_disinfect import ChemCancellationModes from dialin.dg.drain_pump import DrainPumpStates from dialin.dg.thermistors import ThermistorsNames from dialin.dg.temperatures import DGTemperaturesNames from dialin.dg.dialysate_generator import DGOperationModes from dialin.hd.temperatures import HDTemperaturesNames from dialin.dg.concentrate_pumps import DGConcentratePumpsStates from dialin.dg.uv_reactors import ReactorsNames from dialin.common.hd_defs import HDOpModes, HDOpSubModes, PreTreatmentWetSelfTestStates, PostTreatmentStates from dialin.hd.post_treatment import HDPostTreatmentDrainStates from dialin.common.dg_defs import DGEventList from dialin.common.hd_defs import HDEventList from dialin.hd.reservoirs import HDReservoirStates from dialin.dg.reservoirs import DGReservoirsNames from dialin.common.dg_defs import DGGenIdleModeBadFillSubStates from dialin.common.alarm_defs import AlarmList from dialin.dg.fans import DGFansNames from dialin.dg.pressures import DGPressures from dialin.dg.conductivity_sensors import ConductivitySensorsEnum from dialin.dg.voltages import DGMonitoredVoltages from dialin.hd.valves import HDValves from dialin.hd.voltages import HDMonitoredVoltages from dialin.hd.pretreatment import PreTreatmentRsrvrState from time import sleep from datetime import datetime import sys sys.path.append("..") def get_chemical_disinfect_mode_info(): info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, Disinfect_elapsed_time, {}, ' 'Cancellation_mode, {}, R1_level, {:5.3f}, R2_level, {:5.3f}, Current_rinse_count, {}, ' 'Total_rinse_count, {}, UI_state, {}, Top_alarm, {}, ' .format(DGChemicalDisinfectStates(dg.chemical_disinfect.chemical_disinfect_state).name, dg.chemical_disinfect.overall_elapsed_time, dg.chemical_disinfect.state_elapsed_time, dg.chemical_disinfect.chemical_disinfect_elapsed_time, ChemCancellationModes(dg.chemical_disinfect.cancellation_mode).name, dg.chemical_disinfect.r1_level, dg.chemical_disinfect.r2_level, dg.chemical_disinfect.current_post_rinse_count, dg.chemical_disinfect.target_post_rinse_count, DGChemDisinfectUIStates(dg.chemical_disinfect.chemical_disinfect_ui_state).name, hd.alarms.alarm_top)) return info def get_heat_disinfect_mode_info(): info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, Disinfect_elapsed_time, {}, ' 'R1_level, {:5.3f}, R2_level, {:5.3f}, Top_alarm, {}, UI_state, {} ' .format(DGHeatDisinfectStates(dg.heat_disinfect.heat_disinfect_state).name, dg.heat_disinfect.overall_elapsed_time, dg.heat_disinfect.state_elapsed_time, dg.heat_disinfect.heat_disinfect_count_down_time, dg.heat_disinfect.r1_level, dg.heat_disinfect.r2_level, hd.alarms.alarm_top, DGHeatDisinfectUIStates(dg.chemical_disinfect.chemical_disinfect_ui_state).name)) return info def get_hd_run_info(): info = ('HD_op_mode, {}, HD_sub_mode, {}, Top_alarm, {}, Target_UF_ml, {:5.3f}, Meas_UF_ml, {:5.3f}, ' 'Wet_test_state, {}, Post_tx_state, {}, Post_tx_drain_state, {}, Prime_state, {}, Pre_Tx_rsrvr_state, {}, ' .format(HDOpModes(hd.hd_operation_mode).name, hd.hd_operation_sub_mode, hd.alarms.alarm_top, hd.dialysate_outlet_flow.reference_dialysate_outlet_uf_volume, hd.dialysate_outlet_flow.measured_dialysate_outlet_uf_volume, PreTreatmentWetSelfTestStates(hd.pretreatment.pre_treatment_wet_self_test_state).name, PostTreatmentStates(hd.post_treatment.post_treatment_sub_mode).name, HDPostTreatmentDrainStates(hd.post_treatment.post_treatment_drain_state).name, hd.pretreatment.pre_treatment_prime_state, PreTreatmentRsrvrState(hd.pretreatment.pre_treatment_reservoir_state).name)) return info def get_hd_occlusion_pressures_info(): info = ('Art_pres, {:5.3f}, Venous_pres, {:5.3f}, Blood_pump_pres, {}, DialIn_pres, {}, DialOut_pres, {}, ' .format(hd.pressure_occlusion.arterial_pressure, hd.pressure_occlusion.venous_pressure, hd.pressure_occlusion.blood_pump_occlusion, hd.pressure_occlusion.dialysate_inlet_pump_occlusion, hd.pressure_occlusion.dialysate_outlet_pump_occlusion)) return info def get_hd_reservoirs_info(): info = ('HD_rsrvr_state, {}, Active_rsrvr, {}, Active_rsrvr_uf, {:5.3f}, Active_rsrvr_vol_spent, {:5.3f}, ' 'Dilution_level_pct, {:5.3f}, Recirc_level_pct, {:5.3f}, Time_depletion, {}, Time_wait_to_fill, {}, ' 'Target_fill_flow, {:5.3f}, ' .format(HDReservoirStates(hd.hd_reservoirs.reservoir_state).name, DGReservoirsNames(dg.reservoirs.active_reservoir).name, hd.hd_reservoirs.active_reservoir_uf_ml, hd.hd_reservoirs.active_reservoir_spent_vol_ml, hd.hd_reservoirs.dilution_level_pct, hd.hd_reservoirs.recirculation_level_pct, hd.hd_reservoirs.time_depletion, hd.hd_reservoirs.time_wait_to_fill, hd.hd_reservoirs.temp_remove_fill_flow)) return info def get_dg_reservoirs_info(): info = ('Time_rsrvr_cycle, {}, Time_rsrvr_fill_2_switch, {}, Time_uf_decay, {:5.3f}, Temp_uf_fill, {:5.3f}, ' 'Temp_rsrvr_use_actual, {:5.3f}, Temp_rsrvr_end_fill, {:5.3}, Temp_avg_fill, {:5.3f}, ' 'Temp_last_fill, {:5.3f}, Time_rsrvr_fill, {:5.3f}, ' .format(dg.reservoirs.time_reservoir_cycle, dg.reservoirs.time_reservoir_fill_2_switch, dg.reservoirs.time_uf_decay, dg.reservoirs.temp_uf_fill, dg.reservoirs.temp_reservoir_use_actual, dg.reservoirs.temp_reservoir_end_fill, dg.reservoirs.temp_avg_fill, dg.reservoirs.temp_last_fill, dg.reservoirs.time_rsrvr_fill)) return info def get_dg_run_info(): # info = ('DG_op_mode, {}, DG_sub_mode, {}, Op, {}, Sub, {}, '.format(DGOperationModes(dg.dg_operation_mode).name, # dg.dg_operation_sub_mode, # dg.events.get_dg_events(DGEventList(1).value, 3), # dg.events.get_dg_events(DGEventList(2).value, 3))) info = ('DG_op_mode, {}, DG_op_mode_num, {}, DG_sub_mode, {}, '.format(DGOperationModes(dg.dg_operation_mode).name, dg.dg_operation_mode, dg.dg_operation_sub_mode)) return info def get_dg_valves_states(): info = ('VPi, {}, VSP, {}, VPd, {}, VBf, {}, VPo, {}, VDr, {}, VRc, {}, VRo, {}, VRd, {}, VRi, {}, VRf, {}, ' 'VRD1, {}, VRD2, {}, ' .format(dg.valves.valve_states_enum[dg.valves.VALVE_PRESSURE_INLET], dg.valves.valve_states_enum[dg.valves.VALVE_SAMPLING_PORT], dg.valves.valve_states_enum[dg.valves.VALVE_PRODUCTION_DRAIN], dg.valves.valve_states_enum[dg.valves.VALVE_BYPASS_FILTER], dg.valves.valve_states_enum[dg.valves.VALVE_PRESSURE_OUTLET], dg.valves.valve_states_enum[dg.valves.VALVE_DRAIN], dg.valves.valve_states_enum[dg.valves.VALVE_RECIRCULATE], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_OUTLET], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_DRAIN], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_INLET], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_FILL], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_DRAIN_1], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_DRAIN_2])) return info def get_drain_states_info(): info = ('Drain, {}, DAC, {}, Tgt_RPM, {}, Curr_RPM, {}, PRd, {:5.3f}, PDr, {:5.3f}, '. format(DrainPumpStates(dg.drain_pump.drain_pump_state).name, dg.drain_pump.dac_value, dg.drain_pump.target_drain_pump_rpm, dg.drain_pump.current_drain_pump_rpm, dg.pressures.drain_pump_inlet_pressure, dg.pressures.drain_pump_outlet_pressure)) return info def get_load_cells_info(): info = ('A1, {:5.3f}, A2, {:5.3f}, B1, {:5.3f}, B2, {:5.3f}, '. format(dg.load_cells.load_cell_A1, dg.load_cells.load_cell_A2, dg.load_cells.load_cell_B1, dg.load_cells.load_cell_B2)) return info def get_ro_info(): info = ('RO, {}, PPi, {:5.3f}, PPo, {:5.3f}, PWM, {:5.3f}, Flow, {:5.3f}, Tgt_flow, {:5.3f}, Dia_flow, {:5.3f}, ' 'Feedback_PWM, {:5.3f}, Flow_with_conc_pumps, {:5.3f}, Dialysate_flow, {:5.3f}, ' .format(dg.ro_pump.ro_pump_state, dg.pressures.ro_pump_inlet_pressure, dg.pressures.ro_pump_outlet_pressure, dg.ro_pump.pwm_duty_cycle_pct, dg.ro_pump.measured_flow_rate_lpm, dg.ro_pump.target_flow_lpm, dg.dialysate_flow_sensor.flow_rate, dg.ro_pump.feedback_duty_cycle_pct, dg.ro_pump.measured_raw_flow_rate_with_conc_pumps_mlp, dg.dialysate_flow_sensor.flow_rate)) return info def get_new_ro_info(): info = ('New_flow, {:5.3f}, Tgt_flow, {:5.3f}, New_flow_with_conc_pumps, {:5.3f}, New_dialysate_flow, {:5.3f}, ' .format(dg.dialysate_flow_sensor.new_ro_flow, dg.dialysate_flow_sensor.new_dialysate_flow, dg.dialysate_flow_sensor.new_ro_flow_with_conc, dg.dialysate_flow_sensor.new_dialysate_flow)) return info def get_heaters_with_no_temp_info(): info = ('Pri_main_DC, {:5.3f}, Pri_state, {}, Trimmer_DC, {:5.3f}, Trim_state, {}, ' 'Primary_target_temp, {:5.3f}, Trimmer_target_temp, {:5.3f}, '. format(dg.heaters.main_primary_heater_duty_cycle, dg.heaters.primary_heater_state, dg.heaters.trimmer_heater_duty_cycle, dg.heaters.trimmer_heater_state, dg.heaters.primary_heaters_target_temperature, dg.heaters.trimmer_heater_target_temperature)) return info def get_heaters_info(): info = ('Pri_main_DC, {:5.3f}, Pri_state, {}, Trimmer_DC, {:5.3f}, Trimmer_state, {}, ' 'Primary_target_temp, {:5.3f}, Trimmer_target_temp, {:5.3f}, Primary_eff, {:5.3f}, ' 'Primary_calc_temp, {:5.3f}, Trimmer_calc_temp, {:5.3f}, Primary_volt, {:5.3f}, Primary_sec_volt, {:5.3f},' ' Trimmer_volt, {:5.3f}, ' .format(dg.heaters.main_primary_heater_duty_cycle, dg.heaters.primary_heater_state, dg.heaters.trimmer_heater_duty_cycle, dg.heaters.trimmer_heater_state, dg.heaters.primary_heaters_target_temperature, dg.heaters.trimmer_heater_target_temperature, dg.heaters.primary_efficiency, dg.heaters.primary_calc_target_temperature, dg.heaters.trimmer_calc_target_temperature, dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_PRIM_HTR_V.value], dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_PRIM_HTR_GND_V.value], dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_TRIM_HTR_V.value])) return info def get_temperature_sensors_info(): info = ('TPi, {:5.3f}, THd, {:5.3f}, TPo, {:5.3f}, TD1, {:5.3f}, TD2, {:5.3f}, TRo, {:5.3f}, TDi, {:5.3f}, ' .format(dg.temperatures.temperatures[DGTemperaturesNames.INLET_PRIMARY_HEATER.name], dg.temperatures.temperatures[DGTemperaturesNames.HEAT_DISINFECT.name], dg.temperatures.temperatures[DGTemperaturesNames.OUTLET_PRIMARY_HEATER.name], dg.temperatures.temperatures[DGTemperaturesNames.CONDUCTIVITY_SENSOR_1.name], dg.temperatures.temperatures[DGTemperaturesNames.CONDUCTIVITY_SENSOR_2.name], dg.temperatures.temperatures[DGTemperaturesNames.OUTLET_DIALYSATE_REDUNDANT.name], dg.temperatures.temperatures[DGTemperaturesNames.INLET_DIALYSATE.name])) return info def get_dg_fans_info(): info = ('Target_fans_DC, {:5.3f}, Inlet1_RPM, {:5.3f}, Inlet2_RPM, {:5.3f}, Inlet3_RPM, {:5.3f}, ' 'Outlet1_RPM, {:5.3f}, Outlet2_RPM, {:5.3f}, Outlet3_RPM, {:5.3f}, Board_temp, {:5.3f}, ' 'Power_supply_1, {:5.3f}, Power_supply_2, {:5.3f}, FPGA_temp, {:5.3f}, Load_cell_A1_B1, {:5.3f}, ' 'Load_cell_A2_B2, {:5.3f}, timeOffset, {}, ' .format(dg.fans.dg_fans_duty_cycle, dg.fans.inlet_1_rpm, dg.fans.inlet_2_rpm, dg.fans.inlet_3_rpm, dg.fans.outlet_1_rpm, dg.fans.outlet_2_rpm, dg.fans.outlet_3_rpm, dg.thermistors.thermistors[ThermistorsNames.THERMISTOR_ONBOARD_NTC.name], dg.thermistors.thermistors[ThermistorsNames.THERMISTOR_POWER_SUPPLY_1.name], dg.thermistors.thermistors[ThermistorsNames.THERMISTOR_POWER_SUPPLY_2.name], dg.temperatures.temperatures[DGTemperaturesNames.FPGA_BOARD_SENSOR.name], dg.temperatures.temperatures[DGTemperaturesNames.LOAD_CELL_A1_B1.name], dg.temperatures.temperatures[DGTemperaturesNames.LOAD_CELL_A2_B2.name], dg.fans.rpm_alarm_time)) return info def get_hd_fans_info(): info = ('HD_Fan_DC, {:5.3f}, Target_HD_RPM, {:5.3f}, Inlet1_RPM, {:5.3f}, HD_Board_temp, {:5.3f}, ' 'HD_Power_supply, {:5.3f}, HD_FPGA_temp, {:5.3f}, Venous_temp, {:5.3f}, ' 'Arterial_temp, {:5.3f}, RPM_time_offset, {}, ' .format(hd.fans.duty_cycle, hd.fans.target_rpm, hd.fans.inlet_1_rpm, hd.temperatures.hd_temperatures[HDTemperaturesNames.THERMISTOR_ONBOARD_NTC.name], hd.temperatures.hd_temperatures[HDTemperaturesNames.THERMISTOR_POWER_SUPPLY_1.name], hd.temperatures.hd_temperatures[HDTemperaturesNames.TEMPSENSOR_FPGA_BOARD.name], hd.temperatures.hd_temperatures[HDTemperaturesNames.TEMPSENSOR_VENOUS_PRESS_TEMP.name], hd.temperatures.hd_temperatures[HDTemperaturesNames.TEMPSENSOR_ARTERIAL_PRESS_TEMP.name], hd.fans.rpm_alarm_time)) return info def get_uv_reactors_info(): info = ('Inlet_status, {}, Outlet_status, {}, ' .format(dg.uv_reactors.inlet_uv_reactor_state, dg.uv_reactors.outlet_uv_reactor_state)) return info def get_concentrate_pumps_info(): info = ('Bicarb_curr_speed, {:5.3f}, Bicarb_speed, {:5.3f}, Acid_curr_speed, {:5.3f}, Acid_speed, {:5.3f}, ' 'CPo, {:5.3f}, CD1, {:5.3f}, CD2, {:5.3f}, CP1_state, {}, CP2_state, {}, CPi_status, {}, CPo_status, {}, ' 'CD1_status, {}, CD2_status, {}, CP1_pulse, {:5.3f}, CP2_pulse, {:5.3f}, Acid_tgt_speed, {:5.3f}, ' 'Bicarb_tgt_speed, {:5.3f}, ' .format(dg.concentrate_pumps.concentrate_pump_cp2_current_set_speed, dg.concentrate_pumps.concentrate_pump_cp2_measured_speed, dg.concentrate_pumps.concentrate_pump_cp1_current_set_speed, dg.concentrate_pumps.concentrate_pump_cp1_measured_speed, dg.conductivity_sensors.conductivity_sensor_cpo, dg.conductivity_sensors.conductivity_sensor_cd1, dg.conductivity_sensors.conductivity_sensor_cd2, DGConcentratePumpsStates(dg.concentrate_pumps.concentrate_pump_cp1_current_state).name, DGConcentratePumpsStates(dg.concentrate_pumps.concentrate_pump_cp2_current_state).name, dg.conductivity_sensors.cpi_sensor_status, dg.conductivity_sensors.cpo_sensor_status, dg.conductivity_sensors.cd1_sensor_status, dg.conductivity_sensors.cd2_sensor_status, dg.concentrate_pumps.cp1_temp_pulse, dg.concentrate_pumps.cp2_temp_pulse, dg.concentrate_pumps.concentrate_pump_cp1_target_speed, dg.concentrate_pumps.concentrate_pump_cp2_target_speed)) return info def get_blood_leak_info(): info = ('Blood_leak_status, {}, Blood_leak_zero_status_counter, {}, Blood_leak_zeroed_status, {}, ' 'Blood_leak_detect_set_point, {}, Blood_leak_detect_level, {}, Blood_leak_st_count, {}, ' 'Blood_leak_led_intensity, {}, Blood_leak_state, {}, ' .format(hd.blood_leak.get_blood_leak_status(), hd.blood_leak.get_blood_leak_zero_status_counter(), hd.blood_leak.get_blood_leak_zeroed_status(), hd.blood_leak.get_blood_leak_detect_set_point(), hd.blood_leak.get_blood_leak_detect_level(), hd.blood_leak.get_blood_leak_st_count(), hd.blood_leak.get_blood_leak_led_intensity(), hd.blood_leak.blood_leak_state)) return info def get_hd_pumps_info(): info = ('DialIn_tgt_flow, {}, DialIn_meas_flow, {:5.3f}, DialIn_current, {:5.3f}, DialOut_current, {:5.3f}, ' .format(hd.dialysate_inlet_flow.target_dialysate_inlet_flow_rate, hd.dialysate_inlet_flow.measured_dialysate_inlet_flow_rate, hd.dialysate_inlet_flow.measured_dialysate_inlet_pump_mc_current, hd.dialysate_outlet_flow.measured_dialysate_outlet_pump_mc_current)) return info def get_dg_fill_info(): info = ('Avg_acid_cond, {:5.3f}, Avg_bicarb_cond, {:5.3f}, Is_this_first_fill, {}, Diff_cond_pct, {:5.3f}, ' 'Used_acid, {:5.3f}, Used_bicarb, {:5.3f}, Total_vol, {:5.3f}, ' .format(dg.dialysate_fill.avg_acid, dg.dialysate_fill.avg_bicarb, dg.dialysate_fill.first_fill, dg.dialysate_fill.pctDiffConduct, dg.dialysate_fill.used_acid, dg.dialysate_fill.used_acid, dg.dialysate_fill.total_volume)) return info def get_dg_idle_bad_fill_info(): info = ('Bad_fill_state, {}, ' .format(DGGenIdleModeBadFillSubStates(dg.gen_idle.sub_state).name)) return info def get_voltages_info(): info = ('HD_24, {:5.3f}, HD_24_regen, {:5.3f}, '. format(hd.voltages.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_24V.value], hd.voltages.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_24V_REGEN.value])) return info def run_dg(): counter = 1 timer = 0.1 sleep_time = 1 run_number = 1 recirc_delay = 1 f = open("/home/fw/projects/dialin/tests/run_dg.log", "w") dg.hd_proxy.cmd_start_stop_dg() sleep(1) dg.heaters.cmd_start_stop_primary_heater() sleep(0.1) try: while True: dg_run = get_dg_run_info() temperature = get_temperature_sensors_info() heaters = get_heaters_with_no_temp_info() load_cell = get_load_cells_info() drain = get_drain_states_info() ro = get_ro_info() dg_rsrvrs = get_dg_reservoirs_info() fans = get_dg_fans_info() conc = get_concentrate_pumps_info() valves = get_dg_valves_states() var = str(datetime.now()) + ', ' + dg_run + \ temperature + heaters + load_cell + drain + ro + dg_rsrvrs + conc + fans + valves + '\r' print(var) f.write(var) if DGOperationModes(dg.dg_operation_mode).name == DGOperationModes.DG_OP_MODE_GEN_IDLE.name and \ dg.dg_operation_sub_mode == 2 and counter == 1: dg.hd_proxy.cmd_drain(tare_load_cell=True) counter += 1 timer = 1 elif DGOperationModes(dg.dg_operation_mode).name == DGOperationModes.DG_OP_MODE_GEN_IDLE.name and \ dg.dg_operation_sub_mode == 2 and counter == 2: counter += 1 dg.hd_proxy.cmd_fill() elif DGOperationModes(dg.dg_operation_mode).name == DGOperationModes.DG_OP_MODE_GEN_IDLE.name and \ dg.dg_operation_sub_mode == 2 and counter == 3: counter = 1 run_number += 1 sleep(sleep_time) except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg(start=False) f.close() pass def collect_treatment_data(): f = open("/home/fw/projects/dialin/tests/treatment_run.log", "w") #dg.conductivity_sensors.cmd_conductivity_sensor_override(ConductivitySensorsEnum.CPI.value, 1500.00) #sleep(1) """ dg.conductivity_sensors.cmd_conductivity_sensor_override(ConductivitySensorsEnum.CD1.value, 12100.00) sleep(1)SW_CONFIG_DISABLE_WATER_QUALITY_CHECK dg.dialysate_fill.cmd_used_bicarb_volume_override(3700.0) sleep(1) """ counter = 1 start = False sleep_time = 0.05 dg.heaters.cmd_heaters_broadcast_interval_override(50) sleep(1) dg.voltages.cmd_monitored_voltages_broadcast_interval_override(50) #dg.concentrate_pumps.cmd_concentrate_pump_broadcast_interval_override(50) #sleep(0.5) try: while True: hd_run = get_hd_run_info() hd_rsrvrs = get_hd_reservoirs_info() dg_rsrvrs = get_dg_reservoirs_info() dg_run = get_dg_run_info() drain = get_drain_states_info() load_cell = get_load_cells_info() valves = get_dg_valves_states() ro = str(0)#get_ro_info() new_ro = str(0)#get_new_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() dg_fans = get_dg_fans_info() conc_pumps = get_concentrate_pumps_info() blood_leak = get_blood_leak_info() hd_pumps = get_hd_pumps_info() fill_info = get_dg_fill_info() idle_bad_fill = get_dg_idle_bad_fill_info() var = str(datetime.now()) + ', ' + hd_run + dg_run + hd_rsrvrs + dg_rsrvrs + load_cell + drain + ro + \ new_ro + temp + heaters + conc_pumps + dg_fans + valves + blood_leak + hd_pumps + fill_info + '\r' if hd.alarms.get_alarm_state(97) and start is False: dg.conductivity_sensors.cmd_conductivity_sensor_override(ConductivitySensorsEnum.CD2.value, 13700.00) sleep(0.2) dg.conductivity_sensors.cmd_conductivity_sensor_override(ConductivitySensorsEnum.CD1.value, 11600.00) start = True """ var = str(datetime.now()) + ', ' + hd_run + dg_run + load_cell + drain + ro + conc_pumps + valves + \ idle_bad_fill + fill_info + '\r' if dg.dg_operation_mode == 6 and dg.dg_operation_sub_mode == 4 and start is False: #dg.conductivity_sensors.cmd_conductivity_sensor_override(ConductivitySensorsEnum.CD2.value, 12000.00) dg.pressures.cmd_pressure_override(dg.pressures.PRESSURE_SENSOR_RO_PUMP_INLET, 14) start = True if start is True: counter += 1 if 10 < counter < 14: dg.pressures.cmd_pressure_override(dg.pressures.PRESSURE_SENSOR_RO_PUMP_INLET, 14, reset=1) """ print(var) f.write(var) sleep(sleep_time) except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg(start=False) f.close() def collect_hd_treatment(): f = open("/home/fw/projects/dialin/tests/treatment_run_hd.log", "w") #hd.cmd_hd_software_reset_request() hd.dialysate_inlet_flow.cmd_dialysate_inlet_flow_broadcast_interval_override(50) sleep(1) hd.dialysate_outlet_flow.cmd_dialysate_outlet_flow_broadcast_interval_override(50) sleep(1) hd.voltages.cmd_monitored_voltages_broadcast_interval_override(50) try: while True: hd_run = get_hd_run_info() pumps_info = get_hd_pumps_info() voltages = get_voltages_info() var = hd_run + pumps_info + voltages + '\r' print(var) f.write(var) sleep(0.05) except KeyboardInterrupt: f.close() def run_heat_disinfect(): complete_counter = 1 f = open("/home/fw/projects/dialin/tests/Heat_disinfect.log", "w") #dg.hd_proxy.cmd_start_stop_dg_heat_disinfect() try: while True: disinfect = get_heat_disinfect_mode_info() drain = get_drain_states_info() load_cell = get_load_cells_info() valves = get_dg_valves_states() ro = get_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() dg_fans = get_dg_fans_info() hd_fans = get_hd_fans_info() conc = get_concentrate_pumps_info() var = disinfect + load_cell + drain + ro + temp + heaters + dg_fans + hd_fans + valves + conc + '\r' print(var) f.write(var) sleep(1) # If the mode came back to standby or standby solo if dg.dg_operation_mode == 3 or dg.dg_operation_mode == 4: # If it is the first call, stop heat disinfect if complete_counter == 1: dg.hd_proxy.cmd_start_stop_dg_heat_disinfect(start=False) # Write a few more complete states to make sure the complete state items are recorded elif complete_counter == 3: # pass f.close() break complete_counter += 1 except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg_heat_disinfect(start=False) f.close() def run_chemical_disinfect(): complete_counter = 1 f = open("/home/fw/projects/dialin/tests/chemical_disinfect.log", "w") dg.hd_proxy.cmd_start_stop_dg_chemical_disinfect() try: while True: disinfect = get_chemical_disinfect_mode_info() drain = get_drain_states_info() load_cell = get_load_cells_info() conc = get_concentrate_pumps_info() valves = get_dg_valves_states() ro = get_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() fans = get_dg_fans_info() var = disinfect + load_cell + conc + drain + ro + temp + heaters + fans + valves + '\r' print(var) f.write(var) sleep(1) # If the mode came back to standby or standby solo if dg.dg_operation_mode == 3 or dg.dg_operation_mode == 4: # Write a few more complete states to make sure the complete state items are recorded if complete_counter == 3: # pass f.close() break complete_counter += 1 except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg_chemical_disinfect(start=False) f.close() def cmd_set_disinfect_ui_screen(): hd = HD() sleep(0.5) hd.ui.cmd_ui_set_standby_submode_to_disinfect() while True: print(hd.ui.disinfects_hd_submode, hd.ui.disinfects_dg_mode) sleep(1) def test_dg_fans_alarms(): f = open("/home/fw/projects/dialin/tests/dg_fans_issues.log", "w") for fan in DGFansNames.__members__: dg.cmd_dg_software_reset_request() sleep(1) dg.alarms.clear_dialin_alarms() i = 0 ack = False counter = 0 while True: if dg.dg_operation_mode == DGOperationModes.DG_OP_MODE_STANDBY.value: dg.cmd_log_in_to_dg() sleep(1) dg.fans.cmd_fans_data_broadcast_interval_override(50, reset=0) sleep(1) dg.fans.cmd_fans_rpm_override(DGFansNames[fan].value, 1000.0, reset=0) start_time = datetime.now() while True: alarm_state = dg.alarms.get_alarm_state(AlarmList.ALARM_ID_DG_FAN_RPM_OUT_OF_RANGE.value) info = str(fan) + ', ' + str(alarm_state) + ', ' + str(datetime.now() - start_time) + ', ' + \ get_dg_fans_info() + '\r' print(info) f.write(info) if alarm_state: if ack is False and counter == 0: hd.ui.cmd_ui_user_alarm_response(3) ack = True if ack: if i == 0 and counter == 50: dg.fans.cmd_fans_rpm_alarm_start_time_offset(86390) i += 1 elif alarm_state and i > 0: break counter += 1 sleep(0.05) break def test_dvt_drain_pump(): dg.drain_pump.cmd_drain_pump_set_rpm(2000) try: while True: print(get_drain_states_info(), get_temperature_sensors_info(), get_dg_fans_info()) sleep(1) except KeyboardInterrupt: dg.drain_pump.cmd_drain_pump_set_rpm(0) def test_conc_pumps(): dg.concentrate_pumps.cmd_concentrate_pump_state_change_request(0, True) sleep(1) #dg.concentrate_pumps.cmd_concentrate_pump_state_change_request(1, True) # sleep(1) dg.concentrate_pumps.cmd_concentrate_pump_target_speed_override(0, 35) #sleep(1) #dg.concentrate_pumps.cmd_concentrate_pump_target_speed_override(1, 20) try: while True: print(get_concentrate_pumps_info()) sleep(1) except KeyboardInterrupt: dg.concentrate_pumps.cmd_concentrate_pump_state_change_request(0, False) sleep(0.5) dg.concentrate_pumps.cmd_concentrate_pump_state_change_request(1, False) def test_hd_valves(): while True: #VALVE_POSITION_A_INSERT_EJECT = 1 #VALVE_POSITION_B_OPEN = 2 #VALVE_POSITION_C_CLOSE = 3 # hd.valves.cmd_set_hd_valve_position(3,3) sleep(2) hd.valves.cmd_set_hd_valve_position(3,2) sleep(2) hd.valves.cmd_set_hd_valve_position(3,1) sleep(2) if __name__ == "__main__": dg = DG(log_level='DEBUG') dg.cmd_log_in_to_dg() sleep(1) hd = HD(log_level='DEBUG') hd.cmd_log_in_to_hd() sleep(1) #run_heat_disinfect() #run_chemical_disinfect() #run_dg() # cmd_set_disinfect_ui_screen() collect_treatment_data() #collect_hd_treatment() #while True: # print(get_temperature_sensors_info(), get_dg_fans_info()) # sleep(1) #while True: # print(get_new_ro_info(), get_ro_info()) # sleep(1) #while True: # print(dg.rtc.get_rtc_epoch(), hd.rtc.get_rtc_epoch()) # sleep(1) #test_dg_fans_alarms() #test_dvt_drain_pump() #test_conc_pumps() #ui = HDSimulator() #ui.cmd_send_hd_operation_mode(3, 1) #test_hd_valves() #while True: # print(get_heaters_info()) # sleep(1) #dg.ro_pump.cmd_set_ro_flow_rate(0.6)