"""Implementation of reachability functionality""" from logging import Logger from threading import Thread from time import sleep from cloudsync.utils.globals import * import requests # REACHABILITY PARAMETERS REACHABILITY_CHECK_PAUSE = 5 REACHABILITY_CYCLES = 3 REACHABILITY_CYCLE_PAUSE = 10 class ReachabilityProvider: def __init__(self, logger: Logger, url_reachability, initial_delay_s: int = 0): """Initialise the reachability provider. :param int initial_delay_s: Fleet-stagger delay applied once before the first probe. Zero preserves 0.5.4 behavior; a non-zero value prevents the PROD fleet from probing the reachability URL in lock-step at boot. """ self.logger = logger self.url_reachability = url_reachability self.reachability = False self._initial_delay_s = max(0, int(initial_delay_s)) self.thread = Thread(target=self.reachability_test, daemon=True) self.thread.start() def reachability_test(self): """ Continuously monitors the connection to REACHABILITY_URL """ # Optional first-probe stagger. Gated on > 0 so existing # tests that patch ``sleep`` to StopIteration still exercise the # loop body (unchanged 0.5.4 behavior when the provider is # constructed without a jitter). if self._initial_delay_s > 0: sleep(self._initial_delay_s) while True: headers = { 'User-Agent': USER_AGENT, } try: response = requests.get(url=self.url_reachability, headers=headers, timeout=10) status_code = response.status_code except requests.exceptions.RequestException: status_code = INTERNAL_SERVER_ERROR if status_code == OK: if not self.reachability: self.logger.info('Internet UP') self.reachability = True else: if self.reachability: self.logger.warning('Internet DOWN') self.reachability = False sleep(REACHABILITY_CHECK_PAUSE)