########################################################################### # # Copyright (c) 2023-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file test_tx_config_tests.py # # @author (last) Dara Navaei # @date (last) 26-May-2023 # @author (original) Dara Navaei # @date (original) 14-May-2023 # ############################################################################ import os from dialin.dg.dialysate_generator import DG from dialin.hd.hemodialysis_device import HD from dialin.dg.load_cells import DGLoadCellNames from dialin.hd.switches import HDSwitchesNames, HDSwitchStatus from dialin.hd.constants import RESET, NO_RESET from dialin.ui.hd_simulator import HDSimulator from dialin.common.dg_defs import DGHeatDisinfectStates, DGHeatDisinfectUIStates from dialin.dg.heat_disinfect import HeatCancellationModes, NelsonSupportModes from dialin.common.dg_defs import DGChemicalDisinfectStates, DGChemDisinfectUIStates from dialin.common.dg_defs import DGChemDisinfectFlushStates, DGChemDisinfectFlushUIStates from dialin.dg.chemical_disinfect import ChemCancellationModes from dialin.dg.drain_pump import DrainPumpStates, DrainPumpRPMFeedBackSensors from dialin.dg.thermistors import ThermistorsNames from dialin.dg.temperatures import DGTemperaturesNames from dialin.dg.dialysate_generator import DGOperationModes from dialin.hd.temperatures import HDTemperaturesNames from dialin.dg.concentrate_pumps import DGConcentratePumpsStates from dialin.dg.uv_reactors import ReactorsNames from dialin.common.hd_defs import HDOpModes, HDStandbyStates, PreTreatmentWetSelfTestStates, PostTreatmentStates, \ PreTreatmentDrySelfTestsStates from dialin.hd.post_treatment import HDPostTreatmentDrainStates from dialin.common.dg_defs import DGEventList from dialin.common.hd_defs import HDEventList from dialin.hd.reservoirs import HDReservoirStates from dialin.dg.reservoirs import DGReservoirsNames from dialin.common.dg_defs import DGGenIdleModeBadFillSubStates from dialin.common.alarm_defs import AlarmList from dialin.dg.fans import DGFansNames from dialin.dg.pressures import DGPressures from dialin.dg.conductivity_sensors import ConductivitySensorsEnum from dialin.dg.voltages import DGMonitoredVoltages from dialin.dg.valves import DGValveNames from dialin.hd.valves import HDValves from dialin.hd.voltages import HDMonitoredVoltages from dialin.hd.pretreatment import PreTreatmentRsrvrState from dialin.common.dg_defs import DGFlushStates, DGHeatDisinfectActiveCoolStates from dialin.common.test_config_defs import DGTestConfigOptions, HDTestConfigOptions from dialin.common.dg_defs import DGOpModes from dg_tests import * from time import sleep from datetime import datetime import sys sys.path.append("..") def get_dg_run_info(): info = ('DG_op_mode, {}, DG_op_mode_num, {}, DG_sub_mode, {}, '.format(DGOperationModes(dg.dg_operation_mode).name, dg.dg_operation_mode, dg.dg_operation_sub_mode)) return info def get_dg_valves_states(): info = ('VPi, {}, VSP, {}, VPd, {}, VBf, {}, VPo, {}, VDr, {}, VRc, {}, VRo, {}, VRi, {}, VRf, {}, ' 'VRD1, {}, VRD2, {}, ' .format(dg.valves.valve_states_enum[DGValveNames.VALVE_PRESSURE_INLET.value], dg.valves.valve_states_enum[DGValveNames.VALVE_SAMPLING_PORT.value], dg.valves.valve_states_enum[DGValveNames.VALVE_PRODUCTION_DRAIN.value], dg.valves.valve_states_enum[DGValveNames.VALVE_BYPASS_FILTER.value], dg.valves.valve_states_enum[DGValveNames.VALVE_PRESSURE_OUTLET.value], dg.valves.valve_states_enum[DGValveNames.VALVE_DRAIN.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RECIRCULATE.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_OUTLET.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_INLET.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_FILL.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_DRAIN_1.value], dg.valves.valve_states_enum[DGValveNames.VALVE_RESERVOIR_DRAIN_2.value])) return info def get_drain_states_info(): info = ('Drain, {}, DAC, {}, Tgt_RPM, {}, Curr_RPM, {}, PRd, {:5.3f}, PDr, {:5.3f}, Baro, {:5.3f}, ' 'Drain_curr_A, {:5.3f}, Drain_dir, {}, Target_flow_lpm, {:5.3f}, Maxon_rpm, {}, ' .format(DrainPumpStates(dg.drain_pump.drain_pump_state).name, dg.drain_pump.dac_value, dg.drain_pump.target_drain_pump_rpm, dg.drain_pump.current_drain_pump_rpm[DrainPumpRPMFeedBackSensors.DRAIN_PUMP_HALL_SNSR_FB.name], dg.pressures.drain_pump_inlet_pressure, dg.pressures.drain_pump_outlet_pressure, dg.pressures.barometric_pressure, dg.drain_pump.drain_pump_current_A, dg.drain_pump.drain_pump_direction, dg.drain_pump.target_drain_pump_outlet_flow_lpm, dg.drain_pump.current_drain_pump_rpm[DrainPumpRPMFeedBackSensors.DRAIN_PUMP_MAXON_SNSR_FB.name])) return info def get_load_cells_info(): info = ('A1, {:5.3f}, A2, {:5.3f}, B1, {:5.3f}, B2, {:5.3f}, ' .format(dg.load_cells.load_cell_A1, dg.load_cells.load_cell_A2, dg.load_cells.load_cell_B1, dg.load_cells.load_cell_B2)) return info def get_ro_info(): info = ('RO, {}, PPi, {:5.3f}, PPo, {:5.3f}, PWM, {:5.3f}, Flow, {:5.3f}, Tgt_flow, {:5.3f}, ' 'Feedback_PWM, {:5.3f}, Flow_with_conc_pumps, {:5.3f}, Dialysate_flow, {:5.3f}, ' .format(dg.ro_pump.ro_pump_state, dg.pressures.ro_pump_inlet_pressure, dg.pressures.ro_pump_outlet_pressure, dg.ro_pump.pwm_duty_cycle_pct, dg.flow_sensors.measured_ro_flow_LPM, dg.ro_pump.target_flow_lpm, dg.ro_pump.feedback_duty_cycle_pct, dg.flow_sensors.measured_ro_flow_with_cp_LPM, dg.flow_sensors.measured_dialysate_flow_LPM)) return info def check_status(delay_counter, sleep_time, target_delay, state_counter): if delay_counter >= (target_delay / sleep_time): state_counter += 1 delay_counter = 0 else: delay_counter += 1 return state_counter, delay_counter def recover_treatment(): sleep_time = 0.1 counter = 1 delay_counter = 0 target_delay = 1 address = os.path.join(os.getcwd(), "treatment_recovery.log") f = open(address, "w") try: while True: dg_run = get_dg_run_info() drain = get_drain_states_info() load_cell = get_load_cells_info() valves = get_dg_valves_states() ro = get_ro_info() var = str(counter) + ',' + str(target_delay) + ',' + dg_run + load_cell + drain + ro + valves + '\r' print(var) f.write(var) if counter == 1: if delay_counter == 0: #dg.load_cells.cmd_get_load_cells_tare_values() pass # For testing only. Use the get function to get the actual tare values during the development of this # script and the procedure for this dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_1_PRIMARY.name] = 1726.822998046875 dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_1_BACKUP.name] = 1716.2781982421875 dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_2_PRIMARY.name] = 1716.46484375 dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_2_BACKUP.name] = 1712.1719970703125 counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 2 and \ dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_1_PRIMARY.name] != 0.0: if delay_counter == 0: # Received the load cells tare values from DG. Reset both stacks dg.cmd_dg_software_reset_request() hd.cmd_hd_software_reset_request() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 3 and dg.dg_operation_mode == DGOpModes.DG_MODE_STAN.value: if delay_counter == 0: # After reset log in again dg.cmd_log_in_to_dg() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 4 and dg.dg_operation_mode == DGOpModes.DG_MODE_STAN.value: if delay_counter == 0: dg.load_cells.cmd_set_load_cells_tare_values() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 5 and hd.hd_operation_mode == HDOpModes.MODE_STAN.value: if delay_counter == 0: hd.cmd_log_in_to_hd() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 6: if delay_counter == 0: # Set the operation mode is pre-treatment before commanding the start DG hd.cmd_hd_set_operation_mode(HDOpModes.MODE_TPAR.value) counter, delay_counter = check_status(delay_counter, sleep_time, 0, counter) elif counter == 7 and hd.hd_operation_mode == HDOpModes.MODE_TPAR.value: if delay_counter == 0: # Start DG cannot be when HD is in standby dg.hd_proxy.cmd_start_stop_dg() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 8 and dg.dg_operation_mode == DGOpModes.DG_MODE_GENE.value and dg.dg_operation_sub_mode == 1: if delay_counter == 0: # This section is for testing only to make sure we always have fluid in the reservoir. # This part is not needed in the actual test procedure dg.hd_proxy.cmd_fill(500) #pass counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 9 and dg.dg_operation_mode == DGOpModes.DG_MODE_GENE.value and dg.dg_operation_sub_mode == 1: if delay_counter == 0: dg.hd_proxy.cmd_drain() counter, delay_counter = check_status(delay_counter, sleep_time, 0, counter) elif counter == 10 and dg.dg_operation_mode == DGOpModes.DG_MODE_DRAI.value and dg.dg_operation_sub_mode == 1: if delay_counter == 0: #dg.test_configs.cmd_set_test_config(DGTestConfigOptions.TEST_CONFIG_RECOVER_TREATMENT.value) pass counter, delay_counter = check_status(delay_counter, sleep_time, 0, counter) elif counter == 11 and dg.dg_operation_mode == DGOpModes.DG_MODE_DRAI.value and dg.dg_operation_sub_mode == 1: if delay_counter == 0: dg.alarms.cmd_alarm_state_override(AlarmList.ALARM_ID_DG_BARO_PRESSURE_OUT_OF_RANGE.value, 1) counter, delay_counter = check_status(delay_counter, sleep_time, 0, counter) elif counter == 12 and dg.dg_operation_mode == DGOpModes.DG_MODE_FAUL.value and dg.dg_operation_sub_mode == 2: if delay_counter == 0: dg.alarms.cmd_alarm_state_override(AlarmList.ALARM_ID_DG_BARO_PRESSURE_OUT_OF_RANGE.value, 1, reset=1) dg.test_configs.cmd_set_test_config(DGTestConfigOptions.TEST_CONFIG_RECOVER_TREATMENT.value) counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 13 and dg.dg_operation_mode == DGOpModes.DG_MODE_FAUL.value and dg.dg_operation_sub_mode == 2: if delay_counter == 0: dg.test_configs.cmd_set_recover_from_mode_fault_signal() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) sleep(sleep_time) except KeyboardInterrupt: dg.hd_proxy.cmd_start_stop_dg(start=False) f.close() def expedite_pretreatment(): sleep_time = 0.1 counter = 1 delay_counter = 0 target_delay = 1 while True: if counter == 1: if delay_counter == 0: # dg.load_cells.cmd_get_load_cells_tare_values() # For testing only. Use the get function to get the actual tare values during the development of this # script and the procedure for this dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_1_PRIMARY.name] = 1726.822998046875 dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_1_BACKUP.name] = 1716.2781982421875 dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_2_PRIMARY.name] = 1716.46484375 dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_2_BACKUP.name] = 1712.1719970703125 counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 2 and \ dg.load_cells.load_cells_tare_values[DGLoadCellNames.LOAD_CELL_RESERVOIR_1_PRIMARY.name] != 0.0: if delay_counter == 0: # Received the load cells tare values from DG. Reset both stacks dg.cmd_dg_software_reset_request() hd.cmd_hd_software_reset_request() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 3 and dg.dg_operation_mode == 3: if delay_counter == 0: # After reset log in again dg.cmd_log_in_to_dg() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 4: if delay_counter == 0: dg.load_cells.cmd_set_load_cells_tare_values() counter, delay_counter = check_status(delay_counter, sleep_time, 0, counter) elif counter == 5 and hd.hd_operation_mode == 3: if delay_counter == 0: hd.cmd_log_in_to_hd() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 6: if delay_counter == 0: # Set the operation mode is pre-treatment before commanding the start DG hd.cmd_hd_set_operation_mode(4) counter, delay_counter = check_status(delay_counter, sleep_time, 0, counter) elif counter == 7 and hd.hd_operation_mode == 4: if delay_counter == 0: # Start DG cannot be when HD is in standby dg.hd_proxy.cmd_start_stop_dg() counter, delay_counter = check_status(delay_counter, sleep_time, 0, counter) elif counter == 8 and dg.dg_operation_mode == 5 and dg.dg_operation_sub_mode == 1: if delay_counter == 0: # This section is for testing only to make sure we always have fluid in the reservoir. This part is not # needed in the actual test procedure dg.hd_proxy.cmd_fill(1600) counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 9 and dg.dg_operation_mode == 5 and dg.dg_operation_sub_mode == 1: if delay_counter == 0: hd.test_configs.cmd_set_test_config(HDTestConfigOptions.TEST_CONFIG_EXPEDITE_PRE_TREATMENT.value) counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 10: if delay_counter == 0: hd.ui.cmd_set_treatment_parameters() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 11: if delay_counter == 0: hd.ui.cmd_ui_confirm_treatment_parameters(1) counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 12: if delay_counter == 0: hd.ui.cmd_ui_uf_volume_set(0.0) counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) elif counter == 13: if delay_counter == 0: hd.switches.cmd_hd_switch_status_override(HDSwitchesNames.FRONT_DOOR.value, HDSwitchStatus.OPEN.value) counter, delay_counter = check_status(delay_counter, sleep_time, 2, counter) elif counter == 14: if delay_counter == 0: hd.switches.cmd_hd_switch_status_override(HDSwitchesNames.FRONT_DOOR.value, HDSwitchStatus.OPEN.value, RESET) counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) if counter == 15: if delay_counter == 0: hd.ui.cmd_ui_patient_connection_confirm() counter, delay_counter = check_status(delay_counter, sleep_time, 2, counter) if counter == 16: if delay_counter == 0: hd.ui.cmd_ui_start_treatment_request() counter, delay_counter = check_status(delay_counter, sleep_time, target_delay, counter) sleep(sleep_time) if __name__ == "__main__": dg = DG(log_level='DEBUG') dg.cmd_log_in_to_dg() sleep(1) hd = HD(log_level='DEBUG') hd.cmd_log_in_to_hd() sleep(1) #expedite_pretreatment() recover_treatment()