Index: cloudsync/busses/file_input_bus.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -2,7 +2,7 @@ from logging import Logger from threading import Event, Thread -from time import time, sleep +from time import monotonic, sleep, time import inotify.adapters from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler @@ -53,7 +53,8 @@ for event in self.i.event_gen(yield_nones=False): # F1 — every inotify event (including filtered-out ones) counts as # progress; it proves the thread is reading events off the kernel. - self.last_progress_ts = time() + # F13 — monotonic (clock-jump robust). + self.last_progress_ts = monotonic() (_, type_names, path, filename) = event # self.logger.debug("PATH=[{}] FILENAME=[{}] EVENT_TYPES={}".format(path, filename, type_names)) if (('IN_MODIFY' in type_names) or ('IN_CLOSE_WRITE' in type_names)) and ( Index: cloudsync/busses/file_output_bus.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -5,7 +5,7 @@ from logging import Logger from collections import deque from threading import Event, Thread -from time import time, sleep +from time import monotonic, sleep, time from cloudsync.utils import helpers from cloudsync.utils.filesystem import check_writable, check_disk_space_mb @@ -21,6 +21,11 @@ PREWARM_WINDOW_START_HOUR = 23 PREWARM_WINDOW_START_MINUTE = 55 + # F13 — idle-tick cadence for the scheduler. An idle output bus must + # still register as alive to the watchdog, so wake up at least this + # often and refresh last_progress_ts. Well below max_idle_s=120. + IDLE_TICK_S = 60 + def __init__(self, logger: Logger, max_size, @@ -65,7 +70,14 @@ :return: None """ while True: - flag = self.event.wait() + # F13 — tick progress at the top of every loop iteration. This + # makes an idle output bus visible to the watchdog (previously, + # ``last_progress_ts`` only advanced after a successful write, + # so a quiet period longer than max_idle_s=120s falsely tripped + # staleness). Idle-tick uses monotonic, immune to wall-clock + # jumps. + self.last_progress_ts = monotonic() + flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: while len(self.queue) > 0: message_body = self.queue.popleft() @@ -111,7 +123,8 @@ self.last_output_message_id += 1 # F1 - mark progress only after a successful write (failed # writes don't constitute forward progress). - self.last_progress_ts = time() + # F13 - monotonic (clock-jump robust). + self.last_progress_ts = monotonic() except IOError as er: self.logger.error('Opening and/or writing to output file error: {0}'.format(str(er))) Index: cloudsync/common/enums.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/common/enums.py (.../enums.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/common/enums.py (.../enums.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -139,6 +139,12 @@ CS_LOG_ERROR = 931 CS_FACTORY_RESET_ERROR = 932 CS_LOG_RETENTION_ERROR = 933 + # F14c (ST-14, 0.5.5 build 2): persistent auth-failure escalation. + # Emitted once per cool-down window when repeated + # CS_GET_NEW_TOKEN_WITH_CERT_ERROR events cross the threshold. + # Operator-actionable signal: Keycloak auth is broken, investigate + # identity server / cert / Keycloak client config. + CS_AUTH_PERSISTENT_FAIL = 934 @unique Index: cloudsync/handlers/auth_failure_tracker.py =================================================================== diff -u --- cloudsync/handlers/auth_failure_tracker.py (revision 0) +++ cloudsync/handlers/auth_failure_tracker.py (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -0,0 +1,98 @@ +"""F14c — Persistent auth-failure escalation tracker (ST-14). + +Aggregates repeated ``CS_GET_NEW_TOKEN_WITH_CERT_ERROR`` events into a +single operator-visible ``CS_AUTH_PERSISTENT_FAIL`` escalation. Uses a +sliding window over the monotonic clock (consistent with F13) so +wall-clock jumps neither spuriously escalate nor suppress escalation. + +Thresholds are in-memory only — on process restart, the counter resets. +If the underlying failure persists, escalation re-fires within ~N seconds +after boot, which is the desired operator signal (the issue survives +reboot). + +Wiring: ``NetworkRequestHandler.get_valid_token`` calls +:meth:`PersistentAuthFailureTracker.record_failure` after every +``cmd_outgoing_get_new_token_with_cert`` call that returns ``None``, then +checks :meth:`PersistentAuthFailureTracker.should_escalate` to decide +whether to enqueue the 934 error. +""" +from collections import deque +from threading import Lock +from time import monotonic + + +# Default thresholds chosen to match DVT-9 observed failure rate: +# ~1 Hz of silent-fails during the "3 reboots" capture, so 20 failures +# in 5 min is easily crossed when the issue is active and not crossed +# under normal intermittent-network conditions. +DEFAULT_WINDOW_S = 300 # 5 minutes sliding window +DEFAULT_THRESHOLD = 20 # N failures within the window +DEFAULT_COOLDOWN_S = 600 # 10 minutes between escalations + + +class PersistentAuthFailureTracker: + """Sliding-window counter for Keycloak token-fetch failures. + + Thread-safe via a ``Lock`` — ``record_failure`` may be called from any + handler thread, ``should_escalate`` is called from the same network + request thread(s). + + :param int window_s: Sliding window size in seconds (default 300). + :param int threshold: Failure count within the window that triggers + escalation (default 20). + :param int cooldown_s: Minimum seconds between two escalations + (default 600). Prevents spamming the operator when a persistent + failure keeps accumulating. + """ + + def __init__(self, window_s=DEFAULT_WINDOW_S, + threshold=DEFAULT_THRESHOLD, + cooldown_s=DEFAULT_COOLDOWN_S): + self._window_s = window_s + self._threshold = threshold + self._cooldown_s = cooldown_s + self._failures = deque() + self._last_escalation_ts = 0.0 + self._lock = Lock() + + def record_failure(self): + """Record one token-fetch failure at the current monotonic time.""" + with self._lock: + now = monotonic() + self._failures.append(now) + self._trim_locked(now) + + def should_escalate(self): + """Return True iff threshold reached AND cool-down elapsed. + + Returns True at most once per ``cooldown_s`` window regardless of + how many calls are made. When True is returned, the internal + counter is cleared so the next escalation requires a fresh batch + of ``threshold`` failures. + """ + with self._lock: + now = monotonic() + self._trim_locked(now) + if len(self._failures) < self._threshold: + return False + if now - self._last_escalation_ts < self._cooldown_s: + return False + self._last_escalation_ts = now + self._failures.clear() + return True + + def failure_count(self): + """Current failure count within the sliding window (diagnostic).""" + with self._lock: + now = monotonic() + self._trim_locked(now) + return len(self._failures) + + def _trim_locked(self, now): + """Drop entries older than ``window_s`` before ``now``. + + Caller must hold ``self._lock``. + """ + cutoff = now - self._window_s + while self._failures and self._failures[0] < cutoff: + self._failures.popleft() Index: cloudsync/handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -1,6 +1,6 @@ """Handler of network requests between CloudSync, Device Registration Tool and Diality Cloud System""" -from time import time +from time import monotonic, time from logging import Logger from threading import Event, Thread @@ -23,6 +23,11 @@ # remain 1 so ordering of SET_DEVICE_STATE transitions is preserved. DEFAULT_MAX_SIZE = 1 + # F13 — idle-tick cadence. A lane with no arriving work must still + # register as alive to the watchdog. 60s is well below both + # state-lane max_idle_s=300 and idempotent-lane max_idle_s=1800. + IDLE_TICK_S = 60 + def __init__(self, logger: Logger, max_size, output_channel, reachability_provider, error_handler): self.logger = logger self.logconf = LoggingConfig() @@ -35,6 +40,11 @@ # Default 0 signals "no work yet"; the watchdog tolerates this until # the first tick. self.last_progress_ts = 0 + # F14c — persistent auth-failure tracker, used by get_valid_token to + # aggregate repeated Keycloak fetch failures into a single + # operator-visible CS_AUTH_PERSISTENT_FAIL escalation. + from cloudsync.handlers.auth_failure_tracker import PersistentAuthFailureTracker + self.auth_failure_tracker = PersistentAuthFailureTracker() self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() @@ -45,7 +55,11 @@ :return: None """ while True: - flag = self.event.wait() + # F13 — top-of-loop tick in monotonic units. An idle lane + # (especially the idempotent one after a restart) still counts + # as alive; wall-clock jumps don't trip the watchdog. + self.last_progress_ts = monotonic() + flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: while len(self.queue) > 0: req = self.queue.popleft() @@ -54,7 +68,8 @@ finally: # F1 — mark progress after each request returns, # success or failure. - self.last_progress_ts = time() + # F13 — monotonic (clock-jump robust). + self.last_progress_ts = monotonic() clear_correlation_id() self.event.clear() @@ -589,6 +604,7 @@ correlation_id="", device_sn=""): access_token = helpers_get_stored_token() + fetched_fresh = False if access_token is None: self.logger.info("No stored token found, requesting new token") access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, @@ -599,6 +615,7 @@ error_handler=self.error_handler, correlation_id=correlation_id, device_sn=device_sn) + fetched_fresh = True else: response = cmd_outgoing_verify_token(url=token_verification_url, access_token=access_token, @@ -615,9 +632,33 @@ error_handler=self.error_handler, correlation_id=correlation_id, device_sn=device_sn) + fetched_fresh = True else: self.logger.info("Token verification succeeded") + # F14c — if the fresh cert-auth fetch returned None, track the + # failure; emit an aggregate CS_AUTH_PERSISTENT_FAIL escalation + # when the threshold trips. Does NOT replace the per-failure 922 + # already enqueued by cmd_outgoing_get_new_token_with_cert; this + # is an ADDITIONAL operator-actionable signal when the pattern + # is persistent. + if fetched_fresh and access_token is None: + self.auth_failure_tracker.record_failure() + if self.auth_failure_tracker.should_escalate(): + escalation = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_AUTH_PERSISTENT_FAIL.value, + "Cloud authentication failing persistently " + "(>= 20 Keycloak fetch failures in 5 min). " + "Check identity server reachability, device certificate " + "validity, and Keycloak client/role configuration. " + "Recent CS_GET_NEW_TOKEN_WITH_CERT_ERROR entries carry " + "the specific Keycloak response body.") + self.error_handler.enqueue_error(error=escalation) + self.logger.error( + "F14c: persistent auth failure escalated to UI-Brain " + "(CS_AUTH_PERSISTENT_FAIL)") + return access_token def wait_for_network(self, wait_time): Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -98,23 +98,62 @@ g_utils.logger.debug("Making request: {0}, {1}, {2}, {3}".format(url, headers, payload, cert_paths)) + # F14a — split connect/read timeout. 5s was too tight for the read + # phase (13× timeout observed on DVT-9); keep connect tight so we + # fail fast on unreachable identity server. response = requests.post(url=url, headers=headers, data=payload, cert=cert_paths, - timeout=5) + timeout=(5, 30)) - data = response.json() + # F14a — surface silent Keycloak failures. Prior to this, a non-2xx + # response or a 2xx response that lacked "access_token" caused this + # function to return None without enqueuing an error, producing + # thousands of downstream "Missing access token" errors with no + # diagnostic trail. See st14_phase0_findings.md H2 for field data. + if response.status_code != 200: + body_preview = (response.text or "")[:200] + error = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + "Keycloak non-2xx; status={0}, body={1!r}".format( + response.status_code, body_preview)) + error_handler.enqueue_error(error=error) + return None + try: + data = response.json() + except ValueError: + body_preview = (response.text or "")[:200] + error = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + "Keycloak 200 but non-JSON response; body={0!r}".format( + body_preview)) + error_handler.enqueue_error(error=error) + return None + g_utils.logger.debug("Keycloak response: {0}".format(data)) + token = data.get("access_token", None) + if token is None: + body_preview = (response.text or "")[:200] + error = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + "Keycloak 200 but no access_token key; body={0!r}".format( + body_preview)) + error_handler.enqueue_error(error=error) + return None + if save: if not os.path.exists(TOKEN_CACHING_PATH): os.makedirs(TOKEN_CACHING_PATH) with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'w') as f: f.write(json.dumps(data, indent=4)) - return data.get("access_token", None) + return token except requests.exceptions.Timeout: error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, Index: cloudsync/handlers/ui_cs_request_handler.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -39,21 +39,30 @@ self.logger.info('Created UI_CS Handler') + # F13 — idle-tick cadence for the scheduler. An idle message handler + # must still register as alive to the watchdog. Well below max_idle_s=120. + IDLE_TICK_S = 60 + def scheduler(self) -> None: """ Continuously monitors the event flag to check for new messages :return: None """ while True: - flag = self.event.wait() + # F13 — top-of-loop tick in monotonic units so wall-clock + # jumps do not trip the watchdog, and an idle queue still + # counts as healthy. + self.last_progress_ts = time.monotonic() + flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: while len(self.queue) > 0: message = self.queue.popleft() try: self.handle_message(message) finally: # F1 — mark progress after each message is fully processed. - self.last_progress_ts = time.time() + # F13 — monotonic (clock-jump robust). + self.last_progress_ts = time.monotonic() clear_correlation_id() self.event.clear() Index: cloudsync/utils/alive.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/utils/alive.py (.../alive.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/utils/alive.py (.../alive.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -14,7 +14,7 @@ """ from threading import Thread -from time import sleep, time +from time import monotonic, sleep, time from cloudsync.utils.globals import CS_ALIVE_FILE @@ -49,7 +49,9 @@ try: with open(self._path, "w") as f: f.write("{}\n".format(int(time()))) - self.last_progress_ts = time() + # F13 — monotonic for watchdog marker; wall-clock stays in the + # file payload (UI-Brain reads it as a human-readable time). + self.last_progress_ts = monotonic() except (OSError, IOError) as exc: if self._logger is not None: self._logger.warning( Index: cloudsync/utils/heartbeat.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/utils/heartbeat.py (.../heartbeat.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/utils/heartbeat.py (.../heartbeat.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -4,7 +4,7 @@ from logging import Logger from threading import Thread -from time import sleep, time +from time import monotonic, sleep, time from cloudsync.utils.helpers import * from cloudsync.common.enums import * @@ -57,7 +57,8 @@ # F1 — tick the progress marker regardless of send_heartbeat: # a running-but-suppressed heartbeat thread is still "making # progress" for watchdog purposes. - self.last_progress_ts = time() + # F13 — monotonic (clock-jump robust). + self.last_progress_ts = monotonic() sleep(self.HEARTBEAT_FREQ) def _maybe_upload_pending_cs_log(self): Index: cloudsync/utils/helpers.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/utils/helpers.py (.../helpers.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/utils/helpers.py (.../helpers.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -226,8 +226,18 @@ payload = json.loads(base64.urlsafe_b64decode(payload_b64)) exp = payload.get('exp', 0) now = int(time()) - if now >= (exp - 60): # expired or within 60s of expiry - g_utils.logger.info(f"Stored token expired or near expiry (exp={exp}, now={now}), will refresh") + # F14b — clock-skew tolerance. A forward wall-clock step (operator + # `date` command, ntpd step) can make a freshly-issued token look + # "expired" per the device clock even though Keycloak considers it + # valid — causing a refresh storm where every request hits Keycloak + # anew. Give the stored token a 2h grace past its `exp` claim. If + # the token is genuinely expired, DCS will reject it on the next + # call with 401/403 and the refresh path kicks in; a slightly-late + # retry is cheap, a Keycloak refresh storm is not. See + # st14_phase2_fix_spec.md §F14b for the DVT-9 rationale. + TOKEN_EXPIRY_GRACE_S = 7200 + if now >= (exp + TOKEN_EXPIRY_GRACE_S): + g_utils.logger.info(f"Stored token expired beyond grace window (exp={exp}, now={now}, grace={TOKEN_EXPIRY_GRACE_S}s), will refresh") return None except Exception: # If we can't decode, let the caller proceed and DCS will reject if invalid Index: cloudsync/utils/watchdog.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -rb2f7afce365e3044714e0e18505fed5ddefe7764 --- cloudsync/utils/watchdog.py (.../watchdog.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/utils/watchdog.py (.../watchdog.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) @@ -27,7 +27,7 @@ import os from logging import Logger from threading import Event, Thread -from time import sleep, time +from time import monotonic, sleep # Default sentinel file path — cs.py monitors this to trigger process restart @@ -169,7 +169,10 @@ if last_ts == 0: is_stale = False else: - age = time() - last_ts + # F13 — compare in monotonic time. A wall-clock step + # (operator `date` command or ntpd correction) must not + # trip the watchdog; only real elapsed idle time matters. + age = monotonic() - last_ts if age > entry["max_idle_s"]: is_stale = True stale_reason = ("progress stale %.1fs > max_idle_s %s"