import sys sys.path.append("..") from dialin.dg.dialysate_generator import DG from dialin.common.dg_defs import HeatStates, HeatDisinfectUIStates from dialin.dg.heat_disinfect import HeatCancellationModes from dialin.common.dg_defs import ChemicalDisinfectStates, ChemDisinfectUIStates from dialin.dg.chemical_disinfect import ChemCancellationModes from dialin.dg.drain_pump import DrainPumpStates from dialin.dg.thermistors import ThermistorsNames from dialin.dg.temperature_sensors import TemperatureSensorsNames from dialin.dg.dialysate_generator import DGOperationModes from dialin.hd.hemodialysis_device import HD from time import sleep def get_chemical_disinfect_mode_info(): info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, Disinfect_elapsed_time, {}, ' 'Cancellation_mode, {}, R1_level, {:5.3f}, R2_level, {:5.3f}, Current_rinse_count, {}, ' 'Total_rinse_count, {}, UI_state, {}, ' .format(ChemicalDisinfectStates(dg.chemical_disinfect.chemical_disinfect_state).name, dg.chemical_disinfect.overall_elapsed_time, dg.chemical_disinfect.state_elapsed_time, dg.chemical_disinfect.chemical_disinfect_elapsed_time, ChemCancellationModes(dg.chemical_disinfect.cancellation_mode).name, dg.chemical_disinfect.r1_level, dg.chemical_disinfect.r2_level, dg.chemical_disinfect.current_post_rinse_count, dg.chemical_disinfect.target_post_rinse_count, ChemDisinfectUIStates(dg.chemical_disinfect.chemical_disinfect_ui_state).name)) return info def get_concentrate_pumps_info(): info = ('Bicarb_tgt_speed, {:5.3f}, Bicarb_speed, {:5.3f}, CPo, {:5.3f}, CD1, {:5.3f}, CD2, {:5.3f}, ' .format(dg.concentrate_pumps.concentrate_pump_cp2_current_set_speed, dg.concentrate_pumps.concentrate_pump_cp2_measured_speed, dg.conductivity_sensors.conductivity_sensor_cpo, dg.conductivity_sensors.conductivity_sensor_cd1, dg.conductivity_sensors.conductivity_sensor_cd2)) return info def get_heat_disinfect_mode_info(): info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, Disinfect_elapsed_time, {}, ' 'R1_level, {:5.3f}, R2_level, {:5.3f}, Top_alarm, {}, UI_state, {} ' .format(HeatDisinfectStates(dg.heat_disinfect.heat_disinfect_state).name, dg.heat_disinfect.overall_elapsed_time, dg.heat_disinfect.state_elapsed_time, dg.heat_disinfect.heat_disinfect_elapsed_time, dg.heat_disinfect.r1_level, dg.heat_disinfect.r2_level, hd.alarms.alarm_top, HeatDisinfectUIStates(dg.chemical_disinfect.chemical_disinfect_ui_state).name)) return info def get_dg_run_info(): info = ('DG_op_mode, {}, DG_sub_mode, {}, '.format(DGOperationModes(dg.dg_operation_mode).name, dg.dg_operation_sub_mode)) return info def get_dg_valves_states(): info = ('VPi, {}, VSP, {}, VPd, {}, VBf, {}, VPo, {}, VDr, {}, VRc, {}, VRo, {}, VRd, {}, VRi, {}, VRf, {}, ' 'VRD1, {}, VRD2, {}, ' .format(dg.valves.valve_states_enum[dg.valves.VALVE_PRESSURE_INLET], dg.valves.valve_states_enum[dg.valves.VALVE_SAMPLING_PORT], dg.valves.valve_states_enum[dg.valves.VALVE_PRODUCTION_DRAIN], dg.valves.valve_states_enum[dg.valves.VALVE_BYPASS_FILTER], dg.valves.valve_states_enum[dg.valves.VALVE_PRESSURE_OUTLET], dg.valves.valve_states_enum[dg.valves.VALVE_DRAIN], dg.valves.valve_states_enum[dg.valves.VALVE_RECIRCULATE], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_OUTLET], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_DRAIN], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_INLET], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_FILL], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_DRAIN_1], dg.valves.valve_states_enum[dg.valves.VALVE_RESERVOIR_DRAIN_2])) return info def get_drain_states_info(): info = ('Drain, {}, DAC, {}, Tgt_RPM, {}, Curr_RPM, {}, PRd, {:5.3f}, PDr, {:5.3f}, '. format(DrainPumpStates(dg.drain_pump.drain_pump_state).name, dg.drain_pump.dac_value, dg.drain_pump.target_drain_pump_rpm, dg.drain_pump.current_drain_pump_rpm, dg.pressures.drain_pump_inlet_pressure, dg.pressures.drain_pump_outlet_pressure)) return info def get_load_cells_info(): info = ('A1, {:5.3f}, A2, {:5.3f}, B1, {:5.3f}, B2, {:5.3f}, '. format(dg.load_cells.load_cell_A1, dg.load_cells.load_cell_A2, dg.load_cells.load_cell_B1, dg.load_cells.load_cell_B2)) return info def get_ro_info(): info = ('RO, {}, PPi, {:5.3f}, PPo, {:5.3f}, PWM, {:5.3f}, Flow, {:5.3f}, Tgt_flow, {:5.3f}, ' .format(dg.ro_pump.ro_pump_state, dg.pressures.ro_pump_inlet_pressure, dg.pressures.ro_pump_outlet_pressure, dg.ro_pump.pwm_duty_cycle_pct, dg.ro_pump.measured_flow_rate_lpm, dg.ro_pump.target_flow_lpm)) return info def get_heaters_info(): info = ('Pri_main_DC, {:5.3f}, Pri_small_DC, {:5.3f}, Pri_int_temp, {:5.3f}, Prim_CJ_temp, {:5.3f}, ' 'Prim_TC_temp, {:5.3f}, Trimmer_DC, {:5.3f}, Trim_int_temp, {:5.3f}, Trim_CJ_temp, {:5.3f}, ' 'Trim_TC_temp, {:5.3f}, Prim_TC_raw, {:5.3f}, Prim_CJ_raw, {:5.3f}, Trimmer_TC_raw, {:5.3f}, ' 'Trimmer_CJ_raw, {:5.3f}, Primary_target_temp, {:5.3f}, Trimmer_target_temp, {:5.3f}, '. format(dg.heaters.main_primary_heater_duty_cycle, dg.heaters.small_primary_heater_duty_cycle, dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.PRIMARY_HEATER_INTERNAL.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.PRIMARY_HEATER_COLD_JUNCTION.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.PRIMARY_HEATER_THERMOCOUPLE.name], dg.heaters.trimmer_heater_duty_cycle, dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.TRIMMER_HEATER_INTERNAL.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.TRIMMER_HEATER_COLD_JUNCTION.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.TRIMMER_HEATER_THERMOCOUPLE.name], dg.temperature_sensors.primary_raw_thermo_couple, dg.temperature_sensors.primary_raw_cold_junc, dg.temperature_sensors.trimmer_raw_thermo_couple, dg.temperature_sensors.trimmer_raw_cold_junc, dg.heaters.primary_heaters_target_temperature, dg.heaters.trimmer_heater_target_temperature)) return info def get_temperature_sensors_info(): info = ('TPi, {:5.3f}, TPo, {:5.3f}, TD1, {:5.3f}, TD2, {:5.3f}, TRo, {:5.3f}, TDi, {:5.3f}, ' .format(dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.INLET_PRIMARY_HEATER.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.OUTLET_PRIMARY_HEATER.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.CONDUCTIVITY_SENSOR_1.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.CONDUCTIVITY_SENSOR_2.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.OUTLET_DIALYSATE_REDUNDANT.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.INLET_DIALYSATE.name])) return info def get_fans_info(): info = ('Target_fans_DC, {:5.3f}, Inlet1_RPM, {:5.3f}, Outlet1_RPM, {:5.3f}, ' 'Inlet2_RPM, {:5.3f}, Outlet2_RPM, {:5.3f}, Inlet3_RPM, {:5.3f}, Outlet3_RPM, {:5.3f}, Board_temp, {:5.3f},' ' FPGA_temp, {:5.3f}, Load_cell_A1_B1, {:5.3f}, Load_cell_A2_B2, {:5.3f}, ' .format(dg.fans.target_duty_cycle, dg.fans.inlet_1_rpm, dg.fans.inlet_2_rpm, dg.fans.inlet_2_rpm, dg.fans.outlet_2_rpm, dg.fans.inlet_3_rpm, dg.fans.outlet_3_rpm, dg.thermistors.thermistors[ThermistorsNames.THERMISTOR_ONBOARD_NTC.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.FPGA_BOARD_SENSOR.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.LOAD_CELL_A1_B1.name], dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.LOAD_CELL_A2_B2.name])) return info def run_dg(): counter = 1 timer = 1 sleep_time = 1 run_number = 1 f = open("/home/fw/projects/dialin/tests/run_dg.log", "w") """ Set the broadcast to the fastest for studying a phenomenon dg.valves.cmd_valve_broadcast_interval_override(50) sleep(0.25) dg.pressures.cmd_pressure_broadcast_interval_override(50) sleep(0.25) dg.drain_pump.cmd_drain_pump_data_broadcast_interval_override(50) sleep(0.25) dg.ro_pump.cmd_ro_pump_data_broadcast_interval_override(50) sleep(0.25) """ dg.hd_proxy.cmd_start_stop_dg() sleep(0.1) try: while True: dg_run = get_dg_run_info() temperature = get_temperature_sensors_info() heaters = get_heaters_info() load_cell = get_load_cells_info() drain = get_drain_states_info() ro = get_ro_info() valves = get_dg_valves_states() var = str(run_number) + ', ' + str(timer) + ', ' + dg_run + temperature + heaters + load_cell + drain + \ ro + valves + '\r' print(var) f.write(var) tpo = dg.temperature_sensors.temperature_sensors[TemperatureSensorsNames.OUTLET_PRIMARY_HEATER.name] if run_number > 3 and dg.dg_operation_sub_mode == 2: dg.hd_proxy.cmd_start_stop_dg(start=False) f.close() break elif DGOperationModes(dg.dg_operation_mode).name == DGOperationModes.DG_OP_MODE_RECIRCULATE.name and\ dg.dg_operation_sub_mode == 2 and counter == 1: if timer == 1: dg.hd_proxy.cmd_switch_reservoirs(reservoirID=1) timer += 1 if timer > ((1/sleep_time) * 1): timer = 1 counter += 1 elif DGOperationModes(dg.dg_operation_mode).name == DGOperationModes.DG_OP_MODE_RECIRCULATE.name and\ dg.dg_operation_sub_mode == 2 and counter == 2: dg.hd_proxy.cmd_drain(tare_load_cell=True) counter += 1 timer = 1 elif DGOperationModes(dg.dg_operation_mode).name == DGOperationModes.DG_OP_MODE_RECIRCULATE.name and\ dg.dg_operation_sub_mode == 2 and counter == 3: #and tpo > 39.0: timer += 1 if timer > 4: dg.hd_proxy.cmd_fill(volume=1700) counter += 1 elif DGOperationModes(dg.dg_operation_mode).name == DGOperationModes.DG_OP_MODE_RECIRCULATE.name and\ dg.dg_operation_sub_mode == 2 and counter == 4: counter = 1 run_number += 1 sleep(sleep_time) except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg(start=False) f.close() def run_heat_disinfect(): complete_counter = 1 f = open("/home/fw/projects/dialin/tests/Heat_disinfect.log", "w") dg.hd_proxy.cmd_start_stop_heat_disinfect() try: while True: disinfect = get_heat_disinfect_mode_info() drain = get_drain_states_info() load_cell = get_load_cells_info() valves = get_dg_valves_states() ro = get_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() fans = get_fans_info() var = disinfect + load_cell + drain + ro + temp + heaters + fans + valves + '\r' print(var) f.write(var) sleep(1) # If the mode came back to standby or standby solo if dg.dg_operation_mode == 3 or dg.dg_operation_mode == 4: # If it is the first call, stop heat disinfect if complete_counter == 1: dg.hd_proxy.cmd_start_stop_heat_disinfect(start=False) # Write a few more complete states to make sure the complete state items are recorded elif complete_counter == 3: #pass f.close() break complete_counter += 1 except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_heat_disinfect(start=False) f.close() def run_chemical_disinfect(): complete_counter = 1 f = open("/home/fw/projects/dialin/tests/chemical_disinfect.log", "w") dg.hd_proxy.cmd_start_stop_dg_chemical_disinfect() try: while True: disinfect = get_chemical_disinfect_mode_info() drain = get_drain_states_info() load_cell = get_load_cells_info() conc = get_concentrate_pumps_info() valves = get_dg_valves_states() ro = get_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() fans = get_fans_info() var = disinfect + load_cell + conc + drain + ro + temp + heaters + fans + valves + '\r' print(var) f.write(var) sleep(1) # If the mode came back to standby or standby solo if dg.dg_operation_mode == 3 or dg.dg_operation_mode == 4: # Write a few more complete states to make sure the complete state items are recorded if complete_counter == 3: #pass f.close() break complete_counter += 1 except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg_chemical_disinfect(start=False) f.close() def cmd_set_disinfect_ui_screen(): hd = HD() sleep(0.5) hd.ui.cmd_ui_set_standby_submode_to_disinfect() while True: print(hd.ui.disinfects_hd_submode, hd.ui.disinfects_dg_mode) sleep(1) if __name__ == "__main__": dg = DG(log_level='DEBUG') dg.cmd_log_in_to_dg() sleep(1) hd = HD(log_level='DEBUG') hd.cmd_log_in_to_hd() sleep(1) run_heat_disinfect() #run_chemical_disinfect() #run_dg() #cmd_set_disinfect_ui_screen()