########################################################################### # # Copyright (c) 2021-2023 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file test_dg_valves_observer.py # # @author (last) Peter Lucia # @date (last) 21-May-2021 # @author (original) Peter Lucia # @date (original) 29-Apr-2021 # ############################################################################ import sys sys.path.append("../../") from dialin.dg.dialysate_generator import DG from datetime import datetime, timedelta from dialin.utils.base import AbstractObserver import time class Observer(AbstractObserver): def __init__(self, max_observations=20): self.ready = False self.timedelta_in_ms = 0 self.timestamps = [] self.max_observations = max_observations self.observations = 0 self.run = True self.deltas = [] def update(self, result): self.observations += 1 if result.get('datetime', False): self.timestamps.append(datetime.strptime(result["datetime"], "%m.%d.%Y_%I.%M.%S.%f")) max_publish_rate = 500 tolerance = 0.1 max_publish_rate_after_tolerance = max_publish_rate * (1+tolerance) if len(self.timestamps) > 1: delta = self.timestamps[-1] - self.timestamps[-2] self.deltas.append(delta.microseconds*10**-3) max_delta = timedelta(seconds = max_publish_rate_after_tolerance*10**(-3)) print(delta) assert(delta <= max_delta) if self.observations >= self.max_observations: self.run = False def test_monitor_dg_valve_state_rate_1(): """ Test if DG monitors valve states within every 500 ms """ dg = DG() valves_observer = Observer() dg.valves.attach(valves_observer) while valves_observer.run: time.sleep(0.010) # print("DG Valve State Updates (ms):") # for ms in valves_observer.deltas: # print(ms) if __name__ == "__main__": test_monitor_dg_valve_state_rate_1()