Index: cloudsync/utils/watchdog.py =================================================================== diff -u -r4b734f4da2eb0e74fd3e3c996d84d0f676349294 -rbc93cd656618402440375e4fdf986a1b42212e19 --- cloudsync/utils/watchdog.py (.../watchdog.py) (revision 4b734f4da2eb0e74fd3e3c996d84d0f676349294) +++ cloudsync/utils/watchdog.py (.../watchdog.py) (revision bc93cd656618402440375e4fdf986a1b42212e19) @@ -3,12 +3,31 @@ Monitors registered threads via periodic is_alive() checks, attempts restart on failure (up to a configurable limit), and creates a sentinel file to request full process restart when recovery is exhausted. + +F1 — 0.5.5: extends the watchdog with liveness-progress monitoring. +``threading.Thread.is_alive()`` returns True for threads blocked in kernel +syscalls, ``event.wait()``, or busy-waits. The HD000118-style stuck-but-alive +patterns observed in DIAS-43 go completely undetected by is_alive() alone. + +The extended API accepts an optional ``progress_fn`` and ``max_idle_s`` per +registration. On each sweep the watchdog treats stale progress the same way +it treats a dead thread: increment the failure counter, attempt restart, and +escalate to sentinel after ``max_restarts``. Require 3 consecutive stale +reads before escalating (the counter resets when the thread becomes live + +fresh again) — this prevents transient-slowness false positives from +creating unnecessary restart storms. + +``register_sentinel_condition`` provides a second escalation path: an +arbitrary zero-arg predicate checked every sweep. If it returns True, the +watchdog immediately creates the sentinel (no restart attempt). Used for +chronic queue-exhaustion (F4 counter) where individual threads stay alive +but the pipeline is starved. """ import os from logging import Logger from threading import Event, Thread -from time import sleep +from time import sleep, time # Default sentinel file path — cs.py monitors this to trigger process restart @@ -31,29 +50,56 @@ self._max_restarts = max_restarts self._sentinel_path = sentinel_path self._entries = {} + # F1 — sentinel-condition registry (name → predicate) + self._sentinel_conditions = {} self._stop_event = Event() self._thread = Thread(target=self._monitor, daemon=True) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ - def register(self, name, get_thread, restart_fn): + def register(self, name, get_thread, restart_fn, + progress_fn=None, max_idle_s=None): """Register a thread for monitoring. :param str name: Human-readable name (e.g. "heartbeat"). :param callable get_thread: Zero-arg callable returning the current Thread object (e.g. ``lambda: obj.thread``). :param callable restart_fn: Zero-arg callable that creates and starts - a replacement thread. Use :func:`make_restart_fn` for the common + a replacement thread. Use :func:`make_restart_fn` for the common case of daemon threads backed by a bound method. + :param callable progress_fn: (F1) optional zero-arg callable returning + a unix-epoch timestamp of the thread's last semantic progress. + If omitted, only dead-thread detection is performed. + :param float max_idle_s: (F1) maximum seconds of staleness tolerated + before the thread counts as failed. Required when + ``progress_fn`` is provided. """ + if progress_fn is not None and max_idle_s is None: + raise ValueError( + "progress_fn requires max_idle_s for '{0}'".format(name)) self._entries[name] = { "get_thread": get_thread, "restart_fn": restart_fn, + "progress_fn": progress_fn, + "max_idle_s": max_idle_s, "failures": 0, } + def register_sentinel_condition(self, name, condition_fn): + """Register a zero-arg predicate that, when True, triggers sentinel. + + Used for conditions where individual threads are alive but the + pipeline as a whole is starved — e.g. chronic queue saturation + (F4 ``helpers_queue_full_event_count`` > threshold). + + :param str name: Diagnostic label (appears in sentinel file + logs). + :param callable condition_fn: Zero-arg callable returning a bool. + True → immediate sentinel (no restart attempt). + """ + self._sentinel_conditions[name] = condition_fn + def start(self): """Start the watchdog monitoring loop.""" self._thread.start() @@ -85,20 +131,71 @@ self._stop_event.wait(self._check_interval) def _check_threads(self): - """Single sweep: check every registered thread.""" + """Single sweep: check every registered thread + sentinel condition.""" + # F1: first evaluate sentinel conditions — they short-circuit thread + # checks. A True predicate means the pipeline is chronically + # starved and restarting individual threads will not help. + for name, condition_fn in self._sentinel_conditions.items(): + try: + if condition_fn(): + self._logger.error( + "Watchdog: sentinel condition '%s' fired — " + "escalating directly to sentinel", name) + self._create_sentinel("sentinel_condition:" + name) + return + except Exception as exc: # condition_fn misbehaved; log + skip + self._logger.error( + "Watchdog: sentinel condition '%s' raised: %s", + name, exc) + all_healthy = True for name, entry in self._entries.items(): thread = entry["get_thread"]() - if thread is not None and thread.is_alive(): + is_dead = thread is None or not thread.is_alive() + + # F1: check progress staleness even when the thread is alive. + is_stale = False + stale_reason = "" + if not is_dead and entry["progress_fn"] is not None: + try: + last_ts = entry["progress_fn"]() + except Exception as exc: + self._logger.error( + "Watchdog: progress_fn for '%s' raised: %s", name, exc) + last_ts = 0 + # last_ts==0 means "no progress recorded yet" — tolerate until + # the thread has had a chance to tick at least once. Use a + # grace equal to max_idle_s from watchdog start. + if last_ts == 0: + is_stale = False + else: + age = time() - last_ts + if age > entry["max_idle_s"]: + is_stale = True + stale_reason = ("progress stale %.1fs > max_idle_s %s" + % (age, entry["max_idle_s"])) + + if not is_dead and not is_stale: + # F1: reset failure counter on a fully-healthy read. This is + # the "3 consecutive stale reads" rule — any healthy read + # wipes the slate clean and prevents restart-storm escalation + # on transient slowness. + if entry["failures"] > 0: + self._logger.info( + "Watchdog: thread '%s' recovered; " + "resetting failure counter (was %d)", + name, entry["failures"]) + entry["failures"] = 0 continue all_healthy = False entry["failures"] += 1 failures = entry["failures"] + reason = "dead" if is_dead else stale_reason self._logger.error( - "Watchdog: thread '%s' is dead (failure %d/%d)", - name, failures, self._max_restarts, + "Watchdog: thread '%s' is unhealthy (%s) (failure %d/%d)", + name, reason, failures, self._max_restarts, ) if failures <= self._max_restarts: