########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file hd_valves_test.py # # @author (last) Dara Navaei # @date (last) 22-Feb-2022 # @author (original) Dara Navaei # @date (original) 19-Aug-2020 # ############################################################################ import sys sys.path.append("..") from dialin.hd.constants import RESET, NO_RESET from dialin.hd.hemodialysis_device import HD from dialin.hd.valves import ValvesEnum from dialin.hd.valves import AirTrapState from dialin.hd.valves import ValvesPositions from dialin.common.alarm_defs import AlarmList from dialin.common.hd_defs import HDOpModes from time import sleep def cycle_valves(): # valves.cmd_open_hd_air_trap_valve(AirTrapState.STATE_OPEN) # sleep(2) overall_counter = 0 sleep_time = 0.05 overall_time_seconds = 4 loops = 5 counter = 0 valve = ValvesEnum.VDI valves.cmd_hd_valves_broadcast_interval_override(100) sleep(3) # valves.cmd_set_hd_valve_position(0, 2) # sleep(3) valves.cmd_home_hd_valve(valve.value) sleep(3) # valves.cmd_set_hd_valve_current_override(valve.value, 0.85, reset=1) # sleep(3) # valves.cmd_set_hd_valve_position_count_override(valve.value, 5000, reset=1) # sleep(2) f = open("Valves_Test_200.log", "w") while counter < loops: print('Loop: {}'.format(counter)) overall_counter = 0 valves.cmd_set_hd_valve_position(valve.value, ValvesPositions.VALVE_POSITION_B_OPEN.value) while overall_counter < (overall_time_seconds / sleep_time): sleep(sleep_time) status = ('ID, {}, Pos, {}, PosCnt, {}, Cmd, {}, State, {}, Current, {}, PosA, {}, PosB, {}, PosC, {} \r' .format(valves.valves_status[valve.name]['Valve'], valves.valves_status[valve.name]['PosID'], valves.valves_status[valve.name]['PosCnt'], valves.valves_status[valve.name]['Cmd'], valves.valves_status[valve.name]['State'], valves.valves_status[valve.name]['Current'], valves.valves_status[valve.name]['PosA'], valves.valves_status[valve.name]['PosB'], valves.valves_status[valve.name]['PosC'])) print(status) f.write(status) overall_counter = overall_counter + 1 overall_counter = 0 valves.cmd_set_hd_valve_position(valve.value, ValvesPositions.VALVE_POSITION_C_CLOSE.value) while overall_counter < (overall_time_seconds / sleep_time): sleep(sleep_time) print('done') def change_valves_position(): valve = ValvesEnum.VDI valves.cmd_home_hd_valve(valve.value) sleep(0.5) valves.cmd_set_hd_valve_position(valve.value, ValvesPositions.VALVE_POSITION_B_OPEN.value) sleep(0.05) valves.cmd_set_hd_valve_position(valve.value, ValvesPositions.VALVE_POSITION_A_INSERT_EJECT.value) while True: try: status = ('ID, {}, Pos, {}, PosCnt, {}, Cmd, {}, State, {}, Current, {}, PosA, {}, PosB, {}, PosC, {} \r' .format(valves.valves_status[valve.name]['Valve'], valves.valves_status[valve.name]['PosID'], valves.valves_status[valve.name]['PosCnt'], valves.valves_status[valve.name]['Cmd'], valves.valves_status[valve.name]['State'], valves.valves_status[valve.name]['Current'], valves.valves_status[valve.name]['PosA'], valves.valves_status[valve.name]['PosB'], valves.valves_status[valve.name]['PosC'])) print(status) except: pass sleep(1) def fail_homing(): i = 0 while i < 5: status = False hd.cmd_hd_software_reset_request() sleep(1) hd.alarms.clear_dialin_alarms() while True: if hd.hd_operation_mode == HDOpModes.MODE_STAN.value and status is not True: hd.cmd_log_in_to_hd() sleep(1) hd.valves.cmd_home_hd_valve(0) #sleep(2) hd.valves.cmd_set_hd_valve_position_count_override(0, 8000) status = True print(i, hd.alarms.get_alarm_state(AlarmList.ALARM_ID_HD_VALVE_HOMING_FAILED.value), hd.alarms.alarm_top) #, hd.valves.valves_status[ValvesEnum.VDI.name]) sleep(0.1) if hd.alarms.get_alarm_state(AlarmList.ALARM_ID_HD_VALVE_HOMING_FAILED.value): i += 1 sleep(1) break if __name__ == "__main__": # Create an instance of the DG Class hd = HD(log_level='DEBUG') if hd.cmd_log_in_to_hd() == 0: exit(1) valves = hd.valves #cycle_valves() #change_valves_position() fail_homing()