Index: tests/dg_valves_test_observer.py =================================================================== diff -u --- tests/dg_valves_test_observer.py (revision 0) +++ tests/dg_valves_test_observer.py (revision 0ef481abcfe4a92ef0e35aa3397f931953ce7a0e) @@ -0,0 +1,74 @@ +########################################################################### +# +# Copyright (c) 2019-2020 Diality Inc. - All Rights Reserved. +# +# THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN +# WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. +# +# @file dg_valves_test1.py +# +# @date 01-June-2020 +# @author S. Cheung +# +# @brief This development test script exercises the DG valves states in \n +# single and batch modes +# +############################################################################ + +import sys +sys.path.append("..") +from dialin.dg.dialysate_generator import DG +from datetime import datetime, timedelta +from dialin.utils.base import AbstractObserver +import time + + +class Observer(AbstractObserver): + def __init__(self, max_observations=20): + self.ready = False + self.timedelta_in_ms = 0 + self.timestamps = [] + self.max_observations = max_observations + self.observations = 0 + self.run = True + self.deltas = [] + + def update(self, result): + self.observations += 1 + + if result.get('datetime', False): + self.timestamps.append(datetime.strptime(result["datetime"], "%m.%d.%Y_%I.%M.%S.%f")) + + max_publish_rate = 500 + tolerance = 0.1 + max_publish_rate_after_tolerance = max_publish_rate * (1+tolerance) + if len(self.timestamps) > 1: + delta = self.timestamps[-1] - self.timestamps[-2] + self.deltas.append(delta.microseconds*10**-3) + max_delta = timedelta(seconds = max_publish_rate_after_tolerance*10**(-3)) + print(delta) + assert(delta <= max_delta) + + if self.observations >= self.max_observations: + self.run = False + + +def test_monitor_dg_valve_state_rate_1(): + """ + Test if DG monitors valve states within every 500 ms + """ + dg = DG() + + valves_observer = Observer() + dg.valves.attach(valves_observer) + + while valves_observer.run: + time.sleep(0.010) + + # print("DG Valve State Updates (ms):") + # for ms in valves_observer.deltas: + # print(ms) + + +if __name__ == "__main__": + test_monitor_dg_valve_state_rate_1()