"""Internal watchdog thread for monitoring critical daemon threads. Monitors registered threads via periodic is_alive() checks, attempts restart on failure (up to a configurable limit), and creates a sentinel file to request full process restart when recovery is exhausted. """ import os from logging import Logger from threading import Event, Thread from time import sleep # Default sentinel file path — cs.py monitors this to trigger process restart SENTINEL_PATH = "/tmp/cloudsync_restart_sentinel" class Watchdog: """Monitors registered daemon threads and attempts recovery on failure. :param Logger logger: Logger instance for health/status messages. :param int check_interval: Seconds between is_alive() sweeps (default 30). :param int max_restarts: Max restart attempts per thread before sentinel (default 3). :param str sentinel_path: Path to sentinel file for full process restart. """ def __init__(self, logger, check_interval=30, max_restarts=3, sentinel_path=SENTINEL_PATH): self._logger = logger self._check_interval = check_interval self._max_restarts = max_restarts self._sentinel_path = sentinel_path self._entries = {} self._stop_event = Event() self._thread = Thread(target=self._monitor, daemon=True) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def register(self, name, get_thread, restart_fn): """Register a thread for monitoring. :param str name: Human-readable name (e.g. "heartbeat"). :param callable get_thread: Zero-arg callable returning the current Thread object (e.g. ``lambda: obj.thread``). :param callable restart_fn: Zero-arg callable that creates and starts a replacement thread. Use :func:`make_restart_fn` for the common case of daemon threads backed by a bound method. """ self._entries[name] = { "get_thread": get_thread, "restart_fn": restart_fn, "failures": 0, } def start(self): """Start the watchdog monitoring loop.""" self._thread.start() def stop(self): """Signal the watchdog to stop.""" self._stop_event.set() def is_running(self): """Return True if the watchdog thread is alive.""" return self._thread.is_alive() def get_health(self): """Return a dict mapping thread name → alive (bool).""" result = {} for name, entry in self._entries.items(): thread = entry["get_thread"]() result[name] = thread is not None and thread.is_alive() return result # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _monitor(self): """Main monitoring loop — runs until stop_event is set.""" while not self._stop_event.is_set(): self._check_threads() self._stop_event.wait(self._check_interval) def _check_threads(self): """Single sweep: check every registered thread.""" all_healthy = True for name, entry in self._entries.items(): thread = entry["get_thread"]() if thread is not None and thread.is_alive(): continue all_healthy = False entry["failures"] += 1 failures = entry["failures"] self._logger.error( "Watchdog: thread '%s' is dead (failure %d/%d)", name, failures, self._max_restarts, ) if failures <= self._max_restarts: self._try_restart(name, entry) else: self._logger.error( "Watchdog: thread '%s' exceeded max restarts (%d), " "creating sentinel for full process restart", name, self._max_restarts, ) self._create_sentinel(name) if all_healthy: self._logger.info("Watchdog: all %d threads healthy", len(self._entries)) def _try_restart(self, name, entry): """Attempt to restart a dead thread.""" try: entry["restart_fn"]() self._logger.info("Watchdog: thread '%s' restarted", name) except Exception as exc: self._logger.error( "Watchdog: failed to restart thread '%s': %s", name, exc, ) def _create_sentinel(self, failed_thread_name): """Write a sentinel file so cs.py triggers a full process restart.""" try: with open(self._sentinel_path, "w") as fh: fh.write(failed_thread_name) self._logger.info( "Watchdog: sentinel file created at %s", self._sentinel_path, ) except OSError as exc: self._logger.error( "Watchdog: failed to create sentinel file: %s", exc, ) def make_restart_fn(obj, thread_attr, target_fn): """Build a restart callable for the common daemon-thread pattern. Creates a new ``Thread(target=target_fn, daemon=True)``, assigns it to ``obj.``, and starts it. :param obj: Object that owns the thread (e.g. a HeartBeatProvider instance). :param str thread_attr: Name of the Thread attribute on *obj* (e.g. "thread"). :param callable target_fn: Bound method to use as the thread target. :return: Zero-arg callable suitable for :meth:`Watchdog.register`. """ def _restart(): new_thread = Thread(target=target_fn, daemon=True) setattr(obj, thread_attr, new_thread) new_thread.start() return _restart