########################################################################### # # Copyright (c) 2022-2025 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file dg_tests.py # # @author (last) Dara Navaei # @date (last) 24-Oct-2024 # @author (original) Dara Navaei # @date (original) 16-Jan-2022 # ############################################################################ import os import time from dialin.dg.dialysate_generator import DG from dialin.hd.hemodialysis_device import HD from dialin.dg.load_cells import DGLoadCellNames from dialin.ui.hd_simulator import HDSimulator from dialin.common.dg_defs import DGHeatDisinfectStates, DGHeatDisinfectUIStates from dialin.dg.heat_disinfect import HeatCancellationModes, NelsonSupportModes from dialin.common.dg_defs import DGChemicalDisinfectStates, DGChemDisinfectUIStates from dialin.common.dg_defs import DGChemDisinfectFlushStates, DGChemDisinfectFlushUIStates from dialin.dg.chemical_disinfect import ChemCancellationModes from dialin.dg.drain_pump import DrainPumpStates, DrainPumpRPMFeedBackSensors from dialin.dg.thermistors import ThermistorsNames from dialin.dg.temperatures import DGTemperaturesNames from dialin.dg.dialysate_generator import DGOperationModes from dialin.hd.temperatures import HDTemperaturesNames from dialin.dg.concentrate_pumps import DGConcentratePumpsStates from dialin.dg.uv_reactors import ReactorsNames from dialin.common.hd_defs import HDOpModes, HDStandbyStates, PreTreatmentWetSelfTestStates, PostTreatmentStates, \ PreTreatmentDrySelfTestsStates from dialin.hd.post_treatment import HDPostTreatmentDrainStates from dialin.common.dg_defs import DGEventList from dialin.common.hd_defs import HDEventList from dialin.hd.reservoirs import HDReservoirStates from dialin.dg.reservoirs import DGReservoirsNames from dialin.common.dg_defs import DGGenIdleModeBadFillSubStates from dialin.common.alarm_defs import AlarmList from dialin.dg.fans import DGFansNames from dialin.dg.pressures import DGPressures from dialin.dg.conductivity_sensors import ConductivitySensorsEnum from dialin.dg.voltages import DGMonitoredVoltages from dialin.dg.valves import DGValveNames from dialin.hd.valves import HDValves from dialin.hd.voltages import HDMonitoredVoltages from dialin.hd.pretreatment import PreTreatmentRsrvrState from dialin.common.dg_defs import DGFlushStates, DGHeatDisinfectActiveCoolStates, DGROPermeateSampleStates from dialin.dg.ro_permeate_sample import ROPermeateDispenseMsgStatus from dialin.common.test_config_defs import DGTestConfigOptions, HDTestConfigOptions from dialin.hd.blood_leak import EmbModeCommands from time import sleep from datetime import datetime import sys sys.path.append("..") def get_chemical_disinfect_mode_info(): state = 0 state_elapsed_time = 0 overall_elapsed_time = 0 if dg.dg_operation_mode == 10: state = DGChemicalDisinfectStates(dg.chemical_disinfect.chemical_disinfect_state).name overall_elapsed_time = dg.chemical_disinfect.overall_elapsed_time state_elapsed_time = dg.chemical_disinfect.state_elapsed_time elif dg.dg_operation_mode == 11: state = DGChemDisinfectFlushStates(dg.chemical_disinfect_flush.flush_state).name overall_elapsed_time = dg.chemical_disinfect_flush.overall_elapsed_time state_elapsed_time = dg.chemical_disinfect_flush.state_elapsed_time info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, Disinfect_elapsed_time, {}, ' 'Cancellation_mode, {}, R1_level, {:5.3f}, R2_level, {:5.3f}, Current_rinse_count, {}, ' 'Total_rinse_count, {}, Top_alarm, {}, chem_acid_avg_cond, {:5.3f}, ' .format(state, overall_elapsed_time, state_elapsed_time, dg.chemical_disinfect.chemical_disinfect_target_time, ChemCancellationModes(dg.chemical_disinfect.cancellation_mode).name, dg.chemical_disinfect.r1_level, dg.chemical_disinfect.r2_level, dg.chemical_disinfect_flush.rinse_count, dg.chemical_disinfect_flush.rinse_count, hd.alarms.alarm_top, dg.chemical_disinfect.acid_average_cond_us_per_cm)) return info def get_chemical_disinfect_flush_mode_info(): state = 0 state_elapsed_time = 0 overall_elapsed_time = 0 if dg.dg_operation_mode == 10: state = DGChemDisinfectFlushStates(dg.chemical_disinfect_flush.flush_state).name overall_elapsed_time = dg.chemical_disinfect.overall_elapsed_time state_elapsed_time = dg.chemical_disinfect.state_elapsed_time elif dg.dg_operation_mode == 11: state = DGHeatDisinfectActiveCoolStates(dg.heat_disinfect_active_cool.heat_disinfect_active_cool_state).name overall_elapsed_time = dg.heat_disinfect_active_cool.overall_elapsed_time state_elapsed_time = dg.heat_disinfect_active_cool.state_elapsed_time info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, r1, {:5.1f}, r2, {:5.1f}, ' 'Target_rinse_count, {}, Rinse_count, {}, Top_alarm, {}, UI_state, {},' .format(state, overall_elapsed_time, state_elapsed_time, dg.chemical_disinfect_flush.r1, dg.chemical_disinfect_flush.r2, dg.chemical_disinfect_flush.target_rinse_count, dg.chemical_disinfect_flush.rinse_count, hd.alarms.alarm_top, dg.chemical_disinfect_flush.flush_UI_state)) return info def get_heat_disinfect_mode_info(): state = 0 state_elapsed_time = 0 overall_elapsed_time = 0 if dg.dg_operation_mode == 9: state = DGHeatDisinfectStates(dg.heat_disinfect.heat_disinfect_state).name overall_elapsed_time = dg.heat_disinfect.overall_elapsed_time state_elapsed_time = dg.heat_disinfect.state_elapsed_time elif dg.dg_operation_mode == 12: state = DGHeatDisinfectActiveCoolStates(dg.heat_disinfect_active_cool.heat_disinfect_active_cool_state).name overall_elapsed_time = dg.heat_disinfect_active_cool.overall_elapsed_time state_elapsed_time = dg.heat_disinfect_active_cool.state_elapsed_time info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, Disinfect_RO_77, {}, Disinfect_RO_82, {}, ' 'Disinfect_R_77, {}, Disinfect_R_82, {}, R1_level, {:5.3f}, R2_level, {:5.3f}, Top_alarm, {}, UI_state, {},' ' ' .format(state, overall_elapsed_time, state_elapsed_time, dg.heat_disinfect.disinfect_ro_77_time_s, dg.heat_disinfect.disinfect_ro_82_time_s, dg.heat_disinfect.disinfect_r_77_time_s, dg.heat_disinfect.disinfect_r_82_time_s, dg.heat_disinfect.r1_level, dg.heat_disinfect.r2_level, hd.alarms.alarm_top, DGHeatDisinfectUIStates(dg.heat_disinfect.heat_disinfect_ui_state).name)) return info def get_ro_permeate_sample_mode_info(): state = DGROPermeateSampleStates(dg.ro_permeate_sample.ro_permeate_sample_state).name overall_elapsed_time = dg.ro_permeate_sample.overall_elapsed_time state_elapsed_time = dg.ro_permeate_sample.state_elapsed_time info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, dispense_vol_ml, {:5.3f}, dispense_msg, {}, ' 'dg_op_mode, {}, ' .format(state, overall_elapsed_time, state_elapsed_time, dg.ro_permeate_sample.dispensed_volume_ml, ROPermeateDispenseMsgStatus(dg.ro_permeate_sample.ro_permeate_dispense_message_status).name, dg.dg_operation_mode)) return info def get_flush_mode_info(): info = ('State, {}, Overall_elapsed_time, {}, State_elapsed_time, {}, Drain_vol, {:5.3f}, Top_alarm, {}, ' .format(DGFlushStates(dg.flush.flush_state).name, dg.flush.overall_elapsed_time, dg.flush.state_elapsed_time, dg.flush.flush_drain_line_volume_l, hd.alarms.alarm_top)) return info def get_hd_run_info(): info = ('HD_op_mode, {}, HD_sub_mode, {}, Top_alarm, {}, Target_UF_ml, {:5.3f}, Meas_UF_ml, {:5.3f}, ' 'Wet_test_state, {}, Post_tx_state, {}, Post_tx_drain_state, {}, Prime_state, {}, Pre_Tx_rsrvr_state, {}, ' 'Pre_Tx_dry_state, {}, ' .format(HDOpModes(hd.hd_operation_mode).name, hd.hd_operation_sub_mode, hd.alarms.alarm_top, hd.dialysate_outlet_flow.reference_dialysate_outlet_uf_volume, hd.dialysate_outlet_flow.measured_dialysate_outlet_uf_volume, PreTreatmentWetSelfTestStates(hd.pretreatment.pre_treatment_wet_self_test_state).name, PostTreatmentStates(hd.post_treatment.post_treatment_sub_mode).name, HDPostTreatmentDrainStates(hd.post_treatment.post_treatment_drain_state).name, hd.pretreatment.pre_treatment_prime_state, PreTreatmentRsrvrState(hd.pretreatment.pre_treatment_reservoir_state).name, PreTreatmentDrySelfTestsStates(hd.pretreatment.pre_treatment_dry_self_test_state).name)) return info def get_hd_occlusion_pressures_info(): info = ('Art_pres, {:5.3f}, Venous_pres, {:5.3f}, Blood_pump_pres, {:5.3f}, DialOut_pres, {}, ' .format(hd.pressure_occlusion.arterial_pressure, hd.pressure_occlusion.venous_pressure, hd.pressure_occlusion.blood_pump_occlusion, hd.pressure_occlusion.dialysate_outlet_pump_occlusion)) return info def get_hd_reservoirs_info(): info = ('HD_rsrvr_state, {}, Active_rsrvr, {}, Active_rsrvr_uf, {:5.3f}, Active_rsrvr_vol_spent, {:5.3f}, ' 'Dilution_level_pct, {:5.3f}, Recirc_level_pct, {:5.3f}, Time_depletion, {}, Time_wait_to_fill, {}, ' 'Target_fill_flow, {:5.3f}, ' .format(HDReservoirStates(hd.hd_reservoirs.reservoir_state).name, DGReservoirsNames(dg.reservoirs.active_reservoir).name, hd.hd_reservoirs.active_reservoir_uf_ml, hd.hd_reservoirs.active_reservoir_spent_vol_ml, hd.hd_reservoirs.dilution_level_pct, hd.hd_reservoirs.recirculation_level_pct, hd.hd_reservoirs.time_depletion, hd.hd_reservoirs.time_wait_to_fill, hd.hd_reservoirs.temp_remove_fill_flow)) return info def get_dg_reservoirs_info(): info = ('Time_rsrvr_cycle, {}, Time_rsrvr_fill_2_switch, {}, Time_uf_decay, {:5.3f}, Temp_uf_fill, {:5.3f}, ' 'Temp_rsrvr_use_actual, {:5.3f}, Temp_rsrvr_end_fill, {:5.3}, Temp_avg_fill, {:5.3f}, ' 'Temp_last_fill, {:5.3f}, Time_rsrvr_fill, {:5.3f}, ' .format(dg.reservoirs.time_reservoir_cycle, dg.reservoirs.time_reservoir_fill_2_switch, dg.reservoirs.time_uf_decay, dg.reservoirs.temp_uf_fill, dg.reservoirs.temp_reservoir_use_actual, dg.reservoirs.temp_reservoir_end_fill, dg.reservoirs.temp_avg_fill, dg.reservoirs.temp_last_fill, dg.reservoirs.time_rsrvr_fill)) return info def get_dg_run_info(): info = ('DG_op_mode, {}, DG_op_mode_num, {}, DG_sub_mode, {}, '.format(DGOperationModes(dg.dg_operation_mode).name, dg.dg_operation_mode, dg.dg_operation_sub_mode)) return info def get_dg_valves_states(): info = ('VPi, {}, VSP, {}, VPd, {}, VBf, {}, VPo, {}, VDr, {}, VRc, {}, VRo, {}, VRi, {}, VRf, {}, ' 'VRD1, {}, VRD2, {}, ' .format(dg.valves.valve_states_enum[DGValveNames.VALVE_PRESSURE_INLET.value], dg.valves.valve_states_enum[DGValveNames.VALVE_SAMPLING_PORT.value], dg.valves.valve_states_enum[DGValveNames.VALVE_PRODUCTION_DRAIN.value], dg.valves.valve_states_enum[DGValveNames.VALVE_BYPASS_FILTER.value], dg.valves.valve_states_enum[DGValveNames.VALVE_PRESSURE_OUTLET.value], dg.valves.valve_states_enum[DGValveNames.VALVE_DRAIN.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RECIRCULATE.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_OUTLET.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_INLET.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_FILL.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_DRAIN_1.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_DRAIN_2.value])) return info def get_drain_states_info(): info = ('Drain, {}, DAC, {}, Tgt_RPM, {}, Curr_RPM, {}, PRd, {:5.3f}, PDr, {:5.3f}, Baro, {:5.3f}, ' 'Drain_curr_A, {:5.3f}, Drain_dir, {}, Target_flow_lpm, {:5.3f}, Maxon_rpm, {}, ' .format(DrainPumpStates(dg.drain_pump.drain_pump_state).name, dg.drain_pump.dac_value, dg.drain_pump.target_drain_pump_rpm, dg.drain_pump.current_drain_pump_rpm[DrainPumpRPMFeedBackSensors.DRAIN_PUMP_HALL_SNSR_FB.name], dg.pressures.drain_pump_inlet_pressure, dg.pressures.drain_pump_outlet_pressure, dg.pressures.barometric_pressure, dg.drain_pump.drain_pump_current_A, dg.drain_pump.drain_pump_direction, dg.drain_pump.target_drain_pump_outlet_flow_lpm, dg.drain_pump.current_drain_pump_rpm[DrainPumpRPMFeedBackSensors.DRAIN_PUMP_MAXON_SNSR_FB.name])) return info def get_load_cells_info(): info = ('A1, {:5.3f}, A2, {:5.3f}, B1, {:5.3f}, B2, {:5.3f}, ' .format(dg.load_cells.load_cell_A1, dg.load_cells.load_cell_A2, dg.load_cells.load_cell_B1, dg.load_cells.load_cell_B2)) return info def get_ro_info(): info = ('RO, {}, PPi, {:5.3f}, PPo, {:5.3f}, PWM, {:5.3f}, Flow, {:5.3f}, Tgt_flow, {:5.3f}, ' 'Feedback_PWM, {:5.3f}, Flow_with_conc_pumps, {:5.3f}, Dialysate_flow, {:5.3f}, ' .format(dg.ro_pump.ro_pump_state, dg.pressures.ro_pump_inlet_pressure, dg.pressures.ro_pump_outlet_pressure, dg.ro_pump.pwm_duty_cycle_pct, dg.flow_sensors.measured_ro_flow_LPM, dg.ro_pump.target_flow_lpm, dg.ro_pump.feedback_duty_cycle_pct, dg.flow_sensors.measured_ro_flow_with_cp_LPM, dg.flow_sensors.measured_dialysate_flow_LPM)) return info def get_heaters_with_no_temp_info(): info = ('Pri_main_DC, {:5.3f}, Pri_state, {}, Trimmer_DC, {:5.3f}, Trim_state, {}, ' 'Primary_target_temp, {:5.3f}, Trimmer_target_temp, {:5.3f}, '. format(dg.heaters.main_primary_heater_duty_cycle, dg.heaters.primary_heater_state, dg.heaters.trimmer_heater_duty_cycle, dg.heaters.trimmer_heater_state, dg.heaters.primary_heaters_target_temperature, dg.heaters.trimmer_heater_target_temperature)) return info def get_heaters_info(): info = ('Pri_main_DC, {:5.3f}, Pri_state, {}, Trimmer_DC, {:5.3f}, Trimmer_state, {}, ' 'Primary_target_temp, {:5.3f}, Trimmer_target_temp, {:5.3f}, Primary_eff, {:5.3f}, ' 'Primary_calc_temp, {:5.3f}, Trimmer_calc_temp, {:5.3f}, Primary_power, {:5.3f}, ' 'Primary_volt, {:5.3f}, Primary_sec_volt, {:5.3f}, Trimmer_volt, {:5.3f}, Trimmer_use_last_dc, {}, ' 'previous_flow , {:5.3f}, trimmer_ctrl_cntr, {}, ' .format(dg.heaters.main_primary_heater_duty_cycle, dg.heaters.primary_heater_state, dg.heaters.trimmer_heater_duty_cycle, dg.heaters.trimmer_heater_state, dg.heaters.primary_heaters_target_temperature, dg.heaters.trimmer_heater_target_temperature, dg.heaters.primary_efficiency, dg.heaters.primary_calc_target_temperature, dg.heaters.trimmer_calc_target_temperature, dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_POWER_PRIM_HTR_V.value], dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_GND_MAIN_PRIM_HTR_V.value], dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_GND_SMALL_PRIM_HTR_V.value], dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_GND_TRIM_HTR_V.value], dg.heaters.trimmer_use_last_duty_cycle, dg.heaters.previous_flow, dg.heaters.control_counter)) return info def get_temperature_sensors_info(): info = ('TPi, {:5.3f}, THd, {:5.3f}, TPo, {:5.3f}, TD1, {:5.3f}, TD2, {:5.3f}, TRo, {:5.3f}, TDi, {:5.3f}, ' 'Baro_temp, {:5.3f}, TDi_mving_avg, {:5.3f}, TRo_mving_avg, {:5.3f}, ' .format(dg.temperatures.temperatures[DGTemperaturesNames.INLET_PRIMARY_HEATER.name], dg.temperatures.temperatures[DGTemperaturesNames.HEAT_DISINFECT.name], dg.temperatures.temperatures[DGTemperaturesNames.OUTLET_PRIMARY_HEATER.name], dg.temperatures.temperatures[DGTemperaturesNames.CONDUCTIVITY_SENSOR_1.name], dg.temperatures.temperatures[DGTemperaturesNames.CONDUCTIVITY_SENSOR_2.name], dg.temperatures.temperatures[DGTemperaturesNames.OUTLET_DIALYSATE_REDUNDANT.name], dg.temperatures.temperatures[DGTemperaturesNames.INLET_DIALYSATE.name], dg.temperatures.temperatures[DGTemperaturesNames.BARO_TEMP_SENSOR.name], dg.temperatures.dialysate_inlet_moving_avg, dg.temperatures.redundant_outlet_moving_avg)) return info def get_dg_fans_info(): info = ('Target_fans_DC, {:5.3f}, Inlet1_RPM, {:5.3f}, Inlet2_RPM, {:5.3f}, Inlet3_RPM, {:5.3f}, ' 'Outlet1_RPM, {:5.3f}, Outlet2_RPM, {:5.3f}, Outlet3_RPM, {:5.3f}, Board_temp, {:5.3f}, ' 'Power_supply_1, {:5.3f}, Power_supply_2, {:5.3f}, FPGA_temp, {:5.3f}, Load_cell_A1_B1, {:5.3f}, ' 'Load_cell_A2_B2, {:5.3f}, timeOffset, {}, ' .format(dg.fans.dg_fans_duty_cycle, dg.fans.inlet_1_rpm, dg.fans.inlet_2_rpm, dg.fans.inlet_3_rpm, dg.fans.outlet_1_rpm, dg.fans.outlet_2_rpm, dg.fans.outlet_3_rpm, dg.thermistors.thermistors[ThermistorsNames.THERMISTOR_ONBOARD_NTC.name], dg.thermistors.thermistors[ThermistorsNames.THERMISTOR_POWER_SUPPLY_1.name], dg.thermistors.thermistors[ThermistorsNames.THERMISTOR_POWER_SUPPLY_2.name], dg.temperatures.temperatures[DGTemperaturesNames.FPGA_BOARD_SENSOR.name], dg.temperatures.temperatures[DGTemperaturesNames.LOAD_CELL_A1_B1.name], dg.temperatures.temperatures[DGTemperaturesNames.LOAD_CELL_A2_B2.name], dg.fans.rpm_alarm_time)) return info def get_hd_fans_info(): info = ('HD_Fan_DC, {:5.3f}, Target_HD_RPM, {:5.3f}, Inlet1_RPM, {:5.3f}, HD_Board_temp, {:5.3f}, ' 'HD_Power_supply, {:5.3f}, HD_FPGA_temp, {:5.3f}, Venous_temp, {:5.3f}, ' 'Arterial_temp, {:5.3f}, RPM_time_offset, {}, ' .format(hd.fans.duty_cycle, hd.fans.target_rpm, hd.fans.inlet_1_rpm, hd.temperatures.hd_temperatures[HDTemperaturesNames.THERMISTOR_ONBOARD_NTC.name], hd.temperatures.hd_temperatures[HDTemperaturesNames.THERMISTOR_POWER_SUPPLY_1.name], hd.temperatures.hd_temperatures[HDTemperaturesNames.TEMPSENSOR_FPGA_BOARD.name], hd.temperatures.hd_temperatures[HDTemperaturesNames.TEMPSENSOR_VENOUS_PRESS_TEMP.name], hd.temperatures.hd_temperatures[HDTemperaturesNames.TEMPSENSOR_ARTERIAL_PRESS_TEMP.name], hd.fans.rpm_alarm_time)) return info def get_uv_reactors_info(): info = ('Inlet_status, {}, Outlet_status, {}, Inlet_health, {}, Outlet_health, {}, ' .format(dg.uv_reactors.inlet_uv_reactor_state, dg.uv_reactors.outlet_uv_reactor_state, dg.uv_reactors.inlet_uv_reactor_health, dg.uv_reactors.outlet_uv_reactor_health)) return info def get_concentrate_pumps_info(): info = ('Bicarb_curr_speed, {:5.3f}, Bicarb_speed, {:5.3f}, Acid_curr_speed, {:5.3f}, Acid_speed, {:5.3f}, ' 'CPi, {:5.3f}, CPo, {:5.3f}, CD1, {:5.3f}, CD2, {:5.3f}, CP1_state, {}, CP2_state, {}, CPi_status, {}, ' 'CPo_status, {}, CD1_status, {}, CD2_status, {}, CP1_pulse, {:5.3f}, CP2_pulse, {:5.3f}, ' 'Acid_tgt_speed, {:5.3f}, Bicarb_tgt_speed, {:5.3f}, CP1_parked, {}, CP1_park_fault, {}, CP2_parked, {}, ' 'CP2_park_fault, {}, ' .format(dg.concentrate_pumps.concentrate_pump_cp2_current_set_speed, dg.concentrate_pumps.concentrate_pump_cp2_measured_speed, dg.concentrate_pumps.concentrate_pump_cp1_current_set_speed, dg.concentrate_pumps.concentrate_pump_cp1_measured_speed, dg.conductivity_sensors.conductivity_sensor_cpi, dg.conductivity_sensors.conductivity_sensor_cpo, dg.conductivity_sensors.conductivity_sensor_cd1, dg.conductivity_sensors.conductivity_sensor_cd2, DGConcentratePumpsStates(dg.concentrate_pumps.concentrate_pump_cp1_current_state).name, DGConcentratePumpsStates(dg.concentrate_pumps.concentrate_pump_cp2_current_state).name, dg.conductivity_sensors.cpi_sensor_status, dg.conductivity_sensors.cpo_sensor_status, dg.conductivity_sensors.cd1_sensor_status, dg.conductivity_sensors.cd2_sensor_status, dg.concentrate_pumps.cp1_temp_pulse, dg.concentrate_pumps.cp2_temp_pulse, dg.concentrate_pumps.concentrate_pump_cp1_target_speed, dg.concentrate_pumps.concentrate_pump_cp2_target_speed, dg.concentrate_pumps.concentrate_pump_cp1_parked, dg.concentrate_pumps.concentrate_pump_cp1_park_fault, dg.concentrate_pumps.concentrate_pump_cp2_parked, dg.concentrate_pumps.concentrate_pump_cp2_park_fault)) return info def get_blood_leak_info(): info = ('Blood_leak_state, {}, Blood_leak_status, {}, Blood_leak_intensity, {}, Blood_leak_detect, {}, ' 'Blood_leak_int_mv_avg, {}, ' .format(hd.blood_leak.get_blood_leak_state(), hd.blood_leak.get_blood_leak_status(), hd.blood_leak.blood_leak_intensity, hd.blood_leak.blood_leak_blood_detect, hd.blood_leak.blood_leak_intensity_moving_average)) return info def get_hd_pumps_info(): info = ('DialIn_tgt_flow, {}, DialIn_meas_flow, {:5.3f}, DialIn_current, {:5.3f}, DialOut_current, {:5.3f}, ' .format(hd.dialysate_inlet_flow.target_dialysate_inlet_flow_rate, hd.dialysate_inlet_flow.measured_dialysate_inlet_flow_rate, hd.dialysate_inlet_flow.measured_dialysate_inlet_pump_mc_current, hd.dialysate_outlet_flow.measured_dialysate_outlet_pump_mc_current)) return info def get_dg_fill_info(): info = ('Avg_acid_cond, {:5.3f}, Avg_bicarb_cond, {:5.3f}, Is_this_first_fill, {}, Diff_cond_pct, {:5.3f}, ' 'Used_acid, {:5.3f}, Used_bicarb, {:5.3f}, Total_vol, {:5.3f}, ' .format(dg.dialysate_fill.avg_acid, dg.dialysate_fill.avg_bicarb, dg.dialysate_fill.first_fill, dg.dialysate_fill.pctDiffConduct, dg.dialysate_fill.used_acid, dg.dialysate_fill.used_acid, dg.dialysate_fill.total_volume)) return info def get_hd_air_trap_info(): info = ('Air_trap_high_level, {}, Air_trap_low_level, {}, Air_trap_valve, {}, Air_bubble_status, {}, ' .format(hd.air_trap.upper_level, hd.air_trap.lower_level, hd.valves.hd_air_trap_status, hd.air_bubbles.air_bubbles_status)) return info def get_dg_idle_bad_fill_info(): info = ('Bad_fill_state, {}, ' .format(DGGenIdleModeBadFillSubStates(dg.gen_idle.bad_fill_state).name)) return info def get_voltages_info(): info = ('HD_24, {:5.3f}, HD_24_regen, {:5.3f}, '. format(hd.voltages.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_24V.value], hd.voltages.monitored_voltages[HDMonitoredVoltages.MONITORED_LINE_24V_REGEN.value])) return info def get_dg_voltages_info(): info = ('DG_24, {:5.3f}, DG_24_non-iso, {:5.3f}, '. format(dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_24V_MAIN.value], dg.voltages.monitored_voltages[DGMonitoredVoltages.MONITORED_LINE_NON_ISOLATED_24_V_MAIN.value])) return info def collect_treatment_data(): address = os.path.join(os.getcwd(), "treatment_run.log") f = open(address, "w") #dg.conductivity_sensors.cmd_conductivity_sensor_override(ConductivitySensorsEnum.CPI.value, 1500.00) #sleep(1) """ dg.conductivity_sensors.cmd_conductivity_sensor_override(ConductivitySensorsEnum.CD1.value, 12100.00) sleep(1)SW_CONFIG_DISABLE_WATER_QUALITY_CHECK dg.dialysate_fill.cmd_used_bicarb_volume_override(3700.0) sleep(1) """ counter = 1 start = False sleep_time = 1 #dg.heaters.cmd_heaters_broadcast_interval_override(50) #sleep(1) #dg.uv_reactors.cmd_uv_reactors_data_broadcast_interval_override(50) #dg.voltages.cmd_monitored_voltages_broadcast_interval_override(50) #dg.concentrate_pumps.cmd_concentrate_pump_broadcast_interval_override(50) #sleep(1) """ dg.test_configs.cmd_set_test_config(DGTestConfigOptions.TEST_CONFIG_MIX_WITH_WATER.value) sleep(0.5) hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_USE_WET_CARTRIDGE.value) sleep(0.5) hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_SKIP_DISINFECT_AND_SERVICE_TX_BLOCKERS.value) sleep(0.5) hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_DISABLE_WET_SELFTEST_DISPLACEMENT_CHECK.value) sleep(0.5) hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_USE_WORN_CARTRIDGE.value) sleep(0.5) dg.test_configs.cmd_set_test_config(DGTestConfigOptions.TEST_CONFIG_DISABLE_INLET_WATER_TEMP_CHECK.value) sleep(0.5) hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_SKIP_DISINFECT_AND_SERVICE_TX_BLOCKERS.value) """ #dg.hd_proxy.cmd_start_stop_dg() #sleep(4) #dg.hd_proxy.cmd_fill() hd.cmd_hd_set_operation_mode(4) sleep(1) dg.hd_proxy.cmd_start_stop_dg() sleep(1) dg.dialysate_fill.cmd_set_mode_fill_cal_check_state(2) try: while True: hd_run = get_hd_run_info() hd_rsrvrs = get_hd_reservoirs_info() dg_rsrvrs = get_dg_reservoirs_info() dg_run = get_dg_run_info() drain = get_drain_states_info() load_cell = get_load_cells_info() valves = get_dg_valves_states() ro = get_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() dg_fans = get_dg_fans_info() conc_pumps = get_concentrate_pumps_info() blood_leak = get_blood_leak_info() hd_pumps = get_hd_pumps_info() fill_info = get_dg_fill_info() idle_bad_fill = '0,' #get_dg_idle_bad_fill_info() uv = get_uv_reactors_info() hd_fans = get_hd_fans_info() hd_pressures = '0' #get_hd_occlusion_pressures_info() air_trap = get_hd_air_trap_info() var = str(datetime.now()) + ', ' + hd_run + dg_run + hd_rsrvrs + dg_rsrvrs + load_cell + drain + ro + \ temp + heaters + conc_pumps + dg_fans + valves + blood_leak + hd_pumps + fill_info + uv + hd_fans + \ hd_pressures + air_trap + '\r' print(var) f.write(var) sleep(sleep_time) except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg(start=False) f.close() def run_ro_permeate_sample(): complete_counter = 1 address = os.path.join(os.getcwd(), "ro_permeate_sample.log") f = open(address, "w") #dg.hd_proxy.cmd_start_stop_dg_ro_permeate_sample() #dg.temperatures.cmd_temperatures_value_override(0, 49) try: while True: ro_perm = get_ro_permeate_sample_mode_info() drain = get_drain_states_info() load_cell = get_load_cells_info() valves = get_dg_valves_states() ro = get_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() uv = get_uv_reactors_info() dg_fans = get_dg_fans_info() hd_fans = get_hd_fans_info() conc = get_concentrate_pumps_info() var = ro_perm + load_cell + drain + ro + temp + heaters + uv + dg_fans + hd_fans + valves + conc + '\r' print(var) f.write(var) sleep(1) except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg_ro_permeate_sample(start=False) f.close() def run_heat_disinfect(): complete_counter = 1 address = os.path.join(os.getcwd(), "heat_disinfect.log") f = open(address, "w") # dg.heaters.cmd_heaters_broadcast_interval_override(50) # sleep(1) #dg.hd_proxy.cmd_start_stop_dg_heat_disinfect() #dg.hd_proxy.cmd_start_stop_dg_heat_disinfect_active_cool() dg.hd_proxy.cmd_start_stop_dg_passive_cool_heat_disinfect() try: while True: disinfect = get_heat_disinfect_mode_info() drain = get_drain_states_info() load_cell = get_load_cells_info() valves = get_dg_valves_states() ro = get_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() uv = get_uv_reactors_info() dg_fans = get_dg_fans_info() hd_fans = get_hd_fans_info() conc = get_concentrate_pumps_info() var = disinfect + load_cell + drain + ro + temp + heaters + uv + dg_fans + hd_fans + valves + conc + '\r' print(var) f.write(var) sleep(1) # If the mode came back to standby or standby solo # if dg.dg_operation_mode == 3 or dg.dg_operation_mode == 4: # If it is the first call, stop heat disinfect # if complete_counter == 1: # dg.hd_proxy.cmd_start_stop_dg_heat_disinfect(start=False) # Write a few more complete states to make sure the complete state items are recorded # elif complete_counter == 3: # pass # f.close() # break # complete_counter += 1 except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg_heat_disinfect(start=False) sleep(0.2) dg.hd_proxy.cmd_start_stop_dg_heat_disinfect_active_cool(start=False) f.close() def run_chemical_disinfect(): complete_counter = 1 base_dir = os.getcwd() f = open(os.path.join(base_dir, "chemical_disinfect.log"), "w") dg.hd_proxy.cmd_start_stop_dg_chemical_disinfect() #dg.hd_proxy.cmd_start_stop_dg_chemical_disinfect_flush() dg.test_configs.cmd_set_test_config(DGTestConfigOptions.TEST_CONFIG_MIX_WITH_WATER.value) try: while True: disinfect = get_chemical_disinfect_mode_info() drain = get_drain_states_info() load_cell = get_load_cells_info() conc = get_concentrate_pumps_info() valves = get_dg_valves_states() ro = get_ro_info() temp = get_temperature_sensors_info() heaters = get_heaters_info() #fans = get_dg_fans_info() #var = disinfect + load_cell + conc + drain + ro + temp + heaters + fans + valves + '\r' var = disinfect + load_cell + conc + drain + ro + temp + heaters + valves + '\r' print(var) f.write(var) sleep(1) # If the mode came back to standby or standby solo if dg.dg_operation_mode == 3 or dg.dg_operation_mode == 4: # Write a few more complete states to make sure the complete state items are recorded if complete_counter == 3: # pass f.close() break complete_counter += 1 except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg_chemical_disinfect(start=False) dg.hd_proxy.cmd_start_stop_dg_chemical_disinfect_flush(start=False) f.close() def cmd_set_disinfect_ui_screen(): hd = HD() sleep(0.5) hd.ui.cmd_ui_set_standby_submode_to_disinfect() while True: print(hd.ui.disinfects_hd_submode, hd.ui.disinfects_dg_mode) sleep(1) def run_flush_mode(): complete_counter = 1 f = open("/home/fw/projects/dialin/tests/flush_mode.log", "w") dg.hd_proxy.cmd_start_stop_dg_flush() #dg.cmd_dg_software_reset_request() try: while True: flush = get_flush_mode_info() load_cell = get_load_cells_info() drain = get_drain_states_info() ro = get_ro_info() conc = get_concentrate_pumps_info() uv_reactors = get_uv_reactors_info() valves = get_dg_valves_states() var = flush + uv_reactors + load_cell + drain + ro + conc + valves + '\r' print(var) f.write(var) sleep(1) # If the mode came back to standby or standby solo if dg.dg_operation_mode == 3 or dg.dg_operation_mode == 4: if complete_counter == 1: dg.hd_proxy.cmd_start_stop_dg_flush() # Write a few more complete states to make sure the complete state items are recorded if complete_counter == 3: f.close() #events = dg.events.get_dg_events(2, 60) #for event in events: # print(event) break complete_counter += 1 except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg_flush(start=False) f.close() #events = dg.events.get_dg_events(2, 50) #for event in events: # print(event) def run_test_configs(): counter = 1 reset_val = 0 print(hex(dg.test_configs.cmd_get_test_config_status(DGTestConfigOptions.TEST_CONFIG_MIX_WITH_WATER.value))) while True: if counter == 1: dg.test_configs.cmd_set_test_config(DGTestConfigOptions.TEST_CONFIG_MIX_WITH_WATER.value, reset=reset_val) counter += 1 if counter > 10: #hd.test_configs.cmd_request_test_config_status_from_fw() dg.test_configs.cmd_request_test_config_status_from_fw() while True: #print(hex(hd.test_configs.hd_test_configs[HDTestConfigOptions.TEST_CONFIG_USE_WORN_CARTRIDGE.name])) print(hex(dg.test_configs.dg_test_configs[DGTestConfigOptions.TEST_CONFIG_MIX_WITH_WATER.name]), reset_val) #print((dg.test_configs.dg_test_configs), reset_val) if counter > 20: if reset_val == 1: reset_val = 0 else: reset_val = 1 counter = 1 break counter += 1 sleep(0.1) sleep(0.1) if __name__ == "__main__": dg = DG(log_level='DEBUG') dg.cmd_log_in_to_dg() sleep(1) hd = HD(log_level='DEBUG') hd.cmd_log_in_to_hd() sleep(1) hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_USE_WET_CARTRIDGE.value) #sleep(1) #hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_DISABLE_WET_SELFTEST_DISPLACEMENT_CHECK.value) #dg.load_cells.cmd_get_load_cells_tare_values() #dg.test_configs.cmd_set_recover_from_mode_fault_signal() #while True: # print(dg.load_cells.load_cells_tare_values) # sleep(0.5) # if dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_1_PRIMARY.name] != 0.0: # dg.load_cells.cmd_set_load_cells_tare_values() # break #run_test_configs() #run_heat_disinfect() #run_flush_mode() #run_chemical_disinfect() #hd.alarms.cmd_alarm_state_override(319, 1) #dg.ro_permeate_sample.cmd_send_hd_dg_is_ready_to_dispense() #run_dg() # cmd_set_disinfect_ui_screen() #collect_treatment_data() #run_ro_permeate_sample() #collect_hd_treatment() #run_nelson_support_modes() #while True: # print(get_hd_fans_info(), get_dg_fans_info(), get_temperature_sensors_info()) # print(get_dg_valves_states()) # sleep(1) #while True: # print(dg.rtc.get_rtc_epoch(), hd.rtc.get_rtc_epoch()) # sleep(1) #ui = HDSimulator() #ui.cmd_send_hd_operation_mode(3, 1) #hd.ui.cmd_ui_set_dg_service_time() #hd.cmd_hd_software_reset_request() #hd.ui.cmd_set_ro_only_mode_status(1) #hd.blood_leak.cmd_blood_leak_emb_mode_info_cmds_override(EmbModeCommands.I.value, 1000, reset=1) hd.blood_leak.cmd_blood_leak_intensity_moving_average_override(1100.3, reset=1) while True: # print(dg.switches.dg_switches_status) print(time.time(), get_blood_leak_info()) sleep(1) #dg.hd_proxy.cmd_start_stop_dg_heat_disinfect() #while True: # print(hd.alarms.get_alarm_state(286), hd.alarms.get_alarm_state(289), dg.alarms.get_alarm_state(286), # dg.alarms.get_alarm_state(289)) # sleep(0.5) #hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_SKIP_DISINFECT_AND_SERVICE_TX_BLOCKERS.value) #dg.cmd_dg_set_operation_mode(3) #hd.cmd_hd_set_operation_mode(1)