Index: cloud_sync.py =================================================================== diff -u -ra19403bb936dd50d786e22fd52105f586a93f252 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloud_sync.py (.../cloud_sync.py) (revision a19403bb936dd50d786e22fd52105f586a93f252) +++ cloud_sync.py (.../cloud_sync.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -8,14 +8,15 @@ from cloudsync.utils.heartbeat import HeartBeatProvider from cloudsync.busses.file_input_bus import FileInputBus from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler -from cloudsync.handlers.cs_mft_dcs_request_handler import NetworkRequestHandler +from cloudsync.handlers.cs_mft_dcs_request_handler import NetworkRequestHandler, IdempotentRequestHandler from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.busses.file_output_bus import FileOutputBus from cloudsync.utils.reachability import ReachabilityProvider from cloudsync.utils.globals import * from cloudsync.utils.helpers import * from cloudsync.utils.logging import LoggingConfig from cloudsync.utils.watchdog import Watchdog, make_restart_fn +from cloudsync.utils.alive import AliveProvider import hmac import os @@ -24,7 +25,7 @@ import threading -VERSION = "0.5.3" +VERSION = "0.5.6" arguments = sys.argv @@ -90,36 +91,73 @@ g_utils.logger.info(SETUP_CONSOLE_LINE) sys.exit(0) +# Fleet-stagger jitter computed once at boot from the HD serial. +# Reused by ReachabilityProvider (first-probe delay) and FileOutputBus +# (pre-warm window offset). A pre-registration device with no serial +# gets jitter=0, which is acceptable: registration happens once per +# device and is not a fleet-wide concern. +_device_serial = g_config[CONFIG_DEVICE].get(CONFIG_DEVICE_HD_SERIAL) +_device_jitter_s = get_device_jitter_seconds(_device_serial) +g_utils.logger.info( + "Fleet-stagger jitter: %ds (serial=%s)", _device_jitter_s, _device_serial) + try: - reachability_provider = ReachabilityProvider(logger=app.logger, url_reachability=g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_REACHABILITY_URL]) + reachability_provider = ReachabilityProvider( + logger=app.logger, + url_reachability=g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_REACHABILITY_URL], + initial_delay_s=_device_jitter_s) except Exception as e: g_utils.logger.error( "Reachability URL missing from config file. Using Default URL - {0}".format(DEFAULT_REACHABILITY_URL)) reachability_provider = ReachabilityProvider( - logger=app.logger, url_reachability=DEFAULT_REACHABILITY_URL) + logger=app.logger, url_reachability=DEFAULT_REACHABILITY_URL, + initial_delay_s=_device_jitter_s) try: g_utils.add_reachability_provider(reachability_provider=reachability_provider) - output_channel = FileOutputBus(logger=app.logger, max_size=100, file_channels_path=UI2CS_FILE_CHANNELS_PATH) + # Pre-warm thread runs inside FileOutputBus; jitter staggers + # the fleet across a 2-minute window starting at 23:55 UTC. + output_channel = FileOutputBus(logger=app.logger, max_size=100, + file_channels_path=UI2CS_FILE_CHANNELS_PATH, + jitter_seconds=_device_jitter_s) error_handler = ErrorHandler(logger=app.logger, max_size=100, output_channel=output_channel) + # Split-lane architecture. + # State lane (max_size=1): strict-ordering handler for registration, + # SET_DEVICE_STATE and SEND_TREATMENT_REPORT. The single-slot queue is + # the explicit ordering invariant — do NOT change max_size on this one. network_request_handler = NetworkRequestHandler(logger=app.logger, max_size=1, output_channel=output_channel, reachability_provider=reachability_provider, error_handler=error_handler) + # Idempotent lane (max_size=16): absorbs long-running, filename-keyed + # uploads (SEND_DEVICE_LOG up to ~350 MB, SEND_CS_LOG up to ~300 MB). + # Single worker per lane; max_size is queue depth, not parallelism. + # Keeps the state lane responsive during 10–30 minute transfers. + idempotent_network_request_handler = IdempotentRequestHandler( + logger=app.logger, max_size=16, output_channel=output_channel, + reachability_provider=reachability_provider, error_handler=error_handler) message_handler = UICSMessageHandler(logger=app.logger, max_size=20, network_request_handler=network_request_handler, output_channel=output_channel, reachability_provider=reachability_provider, - error_handler=error_handler) + error_handler=error_handler, + idempotent_network_request_handler=idempotent_network_request_handler) ui_cs_bus = FileInputBus(logger=app.logger, file_channels_path=UI2CS_FILE_CHANNELS_PATH, input_channel_name="inp.buf", g_config=g_config, message_handler=message_handler) + # Heartbeat gets the idempotent lane + g_config so it can drain + # rotated CS-log files one-per-tick when that lane is idle. Registration + # mode sets send_heartbeat=False below, so opportunistic CS-log upload + # stays inert during registration. heartbeat_provider = HeartBeatProvider(logger=app.logger, network_request_handler=network_request_handler, - output_channel=output_channel) + output_channel=output_channel, + idempotent_network_request_handler=idempotent_network_request_handler, + g_config=g_config) - logconf.set_network_provider(network_request_handler=network_request_handler) + # CS-log uploads (SEND_CS_LOG) go through the idempotent lane. + logconf.set_network_provider(network_request_handler=idempotent_network_request_handler) logconf.set_error_provider(error_handler=error_handler) logconf.set_configuration(g_config=g_config) logconf.set_log_level(g_config[CONFIG_LOGS][CONFIG_LOGS_DEFAULT_LOG_LEVEL]) @@ -139,20 +177,56 @@ # the registration-to-operation transition. watchdog = Watchdog(logger=app.logger) if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': + # Scheduler-independent liveness file. Auto-starts on construction. + # The ``last_progress_ts`` attribute is consumed by the watchdog (below). + alive_provider = AliveProvider(logger=app.logger) + # Per-thread progress-signal contracts. max_idle_s values are + # conservative; require 3 consecutive stale reads (default + # max_restarts) before escalating to sentinel, which gives each + # thread an effective 3 × check_interval grace on top of max_idle_s. + watchdog.register("alive", lambda: alive_provider.thread, + make_restart_fn(alive_provider, "thread", alive_provider._loop), + progress_fn=lambda: alive_provider.last_progress_ts, + max_idle_s=30) watchdog.register("reachability", lambda: reachability_provider.thread, make_restart_fn(reachability_provider, "thread", reachability_provider.reachability_test)) watchdog.register("output_bus", lambda: output_channel.thread, - make_restart_fn(output_channel, "thread", output_channel.scheduler)) + make_restart_fn(output_channel, "thread", output_channel.scheduler), + progress_fn=lambda: output_channel.last_progress_ts, + max_idle_s=120) watchdog.register("error_handler", lambda: error_handler.thread, make_restart_fn(error_handler, "thread", error_handler.scheduler)) watchdog.register("network_request_handler", lambda: network_request_handler.thread, - make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler)) + make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler), + progress_fn=lambda: network_request_handler.last_progress_ts, + max_idle_s=300) + # Idempotent lane worker: large-file uploads legitimately take + # 10–30 min, so max_idle_s is 1800 s (30 min). + watchdog.register("idempotent_request_handler", + lambda: idempotent_network_request_handler.thread, + make_restart_fn(idempotent_network_request_handler, "thread", + idempotent_network_request_handler.scheduler), + progress_fn=lambda: idempotent_network_request_handler.last_progress_ts, + max_idle_s=1800) watchdog.register("message_handler", lambda: message_handler.thread, - make_restart_fn(message_handler, "thread", message_handler.scheduler)) + make_restart_fn(message_handler, "thread", message_handler.scheduler), + progress_fn=lambda: message_handler.last_progress_ts, + max_idle_s=120) watchdog.register("file_input_bus", lambda: ui_cs_bus.thread, - make_restart_fn(ui_cs_bus, "thread", ui_cs_bus.input_channel_handler)) + make_restart_fn(ui_cs_bus, "thread", ui_cs_bus.input_channel_handler), + progress_fn=lambda: ui_cs_bus.last_progress_ts, + max_idle_s=600) watchdog.register("heartbeat", lambda: heartbeat_provider.thread, - make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat)) + make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat), + progress_fn=lambda: heartbeat_provider.last_progress_ts, + max_idle_s=90) + # Chronic queue-saturation escalation. If the queue-full counter + # shows more than 3 "queue full" events in the last 5 min, individual + # threads are alive but the pipeline is starved — skip restarts and + # go straight to sentinel for a clean-slate process restart. + watchdog.register_sentinel_condition( + "queue_exhaustion", + lambda: helpers_queue_full_event_count() > 3) watchdog.start() g_utils.logger.info("Watchdog started (operation mode)") else: @@ -231,14 +305,24 @@ return {"invalidAttributes": invalid_params}, BAD_REQUEST try: - helpers_add_to_network_queue(network_request_handler=network_request_handler, - request_type=NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS, - url=request.url, - payload=payload, - method=request.method, - g_config=g_config, - success_message='CS2MFT_REQ_SET_CREDENTIALS request added to network ' - 'queue') + # Manufacturing-tool credentials flow. Surface queue + # failures to the existing CS2UI_ERROR channel so the MFT + # can observe and retry. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=network_request_handler, + request_type=NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS, + url=request.url, + payload=payload, + method=request.method, + g_config=g_config, + success_message='CS2MFT_REQ_SET_CREDENTIALS request added to network ' + 'queue') + if outcome != "queued": + error = Error("{0},2,{1},Queue {2}: {3}".format( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, + outcome, detail)) + error_handler.enqueue_error(error=error) except Exception as e: error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, Index: cloudsync/busses/file_input_bus.py =================================================================== diff -u -ra19403bb936dd50d786e22fd52105f586a93f252 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision a19403bb936dd50d786e22fd52105f586a93f252) +++ cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -2,7 +2,7 @@ from logging import Logger from threading import Event, Thread -from time import time, sleep +from time import monotonic, sleep, time import inotify.adapters from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler @@ -30,6 +30,8 @@ """ self.last_input_message_id = 0 + # Progress marker; updated after each inotify event is processed. + self.last_progress_ts = 0 self.logger = logger self.file_channels_path = file_channels_path @@ -49,6 +51,10 @@ The Input Channel Handler - it parses and sends upstream all messages arriving on the File Input Bus """ for event in self.i.event_gen(yield_nones=False): + # Every inotify event (including filtered-out ones) counts as + # progress; it proves the thread is reading events off the kernel. + # Monotonic clock so a wall-clock step does not poison the marker. + self.last_progress_ts = monotonic() (_, type_names, path, filename) = event # self.logger.debug("PATH=[{}] FILENAME=[{}] EVENT_TYPES={}".format(path, filename, type_names)) if (('IN_MODIFY' in type_names) or ('IN_CLOSE_WRITE' in type_names)) and ( Index: cloudsync/busses/file_output_bus.py =================================================================== diff -u -ra19403bb936dd50d786e22fd52105f586a93f252 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision a19403bb936dd50d786e22fd52105f586a93f252) +++ cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -1,45 +1,82 @@ """Implementation of an Output Bus based on the Linux file system""" +import datetime +import os from logging import Logger from collections import deque from threading import Event, Thread -import datetime +from time import monotonic, sleep, time from cloudsync.utils import helpers from cloudsync.utils.filesystem import check_writable, check_disk_space_mb class FileOutputBus: """File Output Bus class that receives, parses and sends downstream all outbound messages""" + + # Pre-warm window and cadence. The window is 5 minutes wide so the + # 0-120 s fleet jitter fits inside with a 3-minute buffer; the check + # runs once per minute which is cheap and well under the window size. + PREWARM_CHECK_INTERVAL_S = 60 + PREWARM_WINDOW_START_HOUR = 23 + PREWARM_WINDOW_START_MINUTE = 55 + + # Idle-tick cadence for the scheduler. An idle output bus must still + # register as alive to the watchdog, so wake up at least this often + # and refresh last_progress_ts. Well below max_idle_s=120. + IDLE_TICK_S = 60 + def __init__(self, logger: Logger, max_size, - file_channels_path: str): + file_channels_path: str, + jitter_seconds: int = 0): """ Initialize the File Input Bus. :param Logger logger: Logger object :param max_size: the maximum size of the message queue for the Output Bus :type max_size: int :param str file_channels_path: the path where the bus files are located + :param int jitter_seconds: Per-device jitter in [0, 120) used by + the pre-warm loop to stagger the fleet across a 2-minute + window inside the 5-minute pre-warm window. 0 means "fire at + the window start" (default; registration mode has no serial). """ self.last_output_message_id = 1 self.logger = logger self.file_channels_path = file_channels_path self.queue = deque(maxlen=max_size) + # Progress marker; updated after every successful out.buf write. + self.last_progress_ts = 0 self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() + # Pre-warm state. `_last_prewarm_date` is the UTC date on which + # we successfully touched tomorrow's file; prevents double-firing + # inside the 5-minute window. Failures intentionally do NOT update + # it so the loop retries next tick. + self._jitter_seconds = max(0, int(jitter_seconds)) + self._last_prewarm_date = None + self.prewarm_thread = Thread(target=self._prewarm_loop, daemon=True) + self.prewarm_thread.start() + def scheduler(self) -> None: """ Continuously monitors the event flag to check for new messages :return: None """ while True: - flag = self.event.wait() + # Tick progress at the top of every loop iteration. This makes + # an idle output bus visible to the watchdog (previously, + # ``last_progress_ts`` only advanced after a successful write, + # so a quiet period longer than max_idle_s=120s falsely tripped + # staleness). Monotonic clock, immune to wall-clock jumps. + self.last_progress_ts = monotonic() + flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: while len(self.queue) > 0: message_body = self.queue.popleft() @@ -83,5 +120,59 @@ with open(self.file_channels_path + "/" + filename, "a") as f: f.write("{0}\n".format(message_body)) self.last_output_message_id += 1 + # Mark progress only after a successful write (failed writes + # don't constitute forward progress). Monotonic clock so a + # wall-clock step does not poison the marker. + self.last_progress_ts = monotonic() except IOError as er: self.logger.error('Opening and/or writing to output file error: {0}'.format(str(er))) + + def _prewarm_loop(self): + """Independent timer loop that calls ``_maybe_prewarm_tomorrow``. + + Any exception in the body is caught and logged; the loop must + never die silently because the scheduler thread has no way to + revive it. The watchdog does NOT monitor this thread - it's + opportunistic; a skipped pre-warm degrades gracefully to the + 0.5.4 behavior of creating the file at midnight. + """ + while True: + try: + self._maybe_prewarm_tomorrow() + except Exception as exc: + self.logger.error("Pre-warm: loop iteration failed: %s", exc) + sleep(self.PREWARM_CHECK_INTERVAL_S) + + def _maybe_prewarm_tomorrow(self): + """Touch tomorrow's out.buf in [23:55 + jitter, 00:00) UTC. + + The 5-minute window is wider than the 2-minute jitter range so + every device gets a fire opportunity inside the window regardless + of thread sleep drift. Once-per-date by ``_last_prewarm_date`` + ensures idempotency; failures do NOT update the date so the next + tick retries. + """ + now = datetime.datetime.now(datetime.timezone.utc) + if now.hour != self.PREWARM_WINDOW_START_HOUR: + return + if now.minute < self.PREWARM_WINDOW_START_MINUTE: + return + today_key = now.date() + if self._last_prewarm_date == today_key: + return + # Jitter delays the fire until (seconds since 23:55) >= jitter. + # Spreads the fleet fire-times across a 2-minute window starting + # at 23:55:00. + seconds_into_window = (now.minute - self.PREWARM_WINDOW_START_MINUTE) * 60 + now.second + if seconds_into_window < self._jitter_seconds: + return + tomorrow = (now + datetime.timedelta(days=1)).date() + filename = tomorrow.strftime("%Y_%m_%d_out.buf") + path = os.path.join(self.file_channels_path, filename) + try: + with open(path, "a"): + pass + self._last_prewarm_date = today_key + self.logger.debug("Pre-warmed %s (jitter=%ds)", path, self._jitter_seconds) + except (IOError, OSError) as exc: + self.logger.warning("Pre-warm failed for %s: %s", path, exc) Index: cloudsync/common/enums.py =================================================================== diff -u -r8fe9c82ff8cd968ebdca518f35c3264a25a02927 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/common/enums.py (.../enums.py) (revision 8fe9c82ff8cd968ebdca518f35c3264a25a02927) +++ cloudsync/common/enums.py (.../enums.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -139,10 +139,57 @@ CS_LOG_ERROR = 931 CS_FACTORY_RESET_ERROR = 932 CS_LOG_RETENTION_ERROR = 933 + # Persistent auth-failure escalation. Emitted once per cool-down + # window when repeated CS_GET_NEW_TOKEN_WITH_CERT_ERROR events cross + # the threshold. Operator-actionable signal: Keycloak auth is broken + # — investigate identity server / cert / Keycloak client config. + CS_AUTH_PERSISTENT_FAIL = 934 @unique class LogUploadReasonCode(RootEnum): DISCONNECTED = 1 # Backup & retry CREDENTIAL = 2 # Backup & NO upload DUPLICATE = 3 # Backup & rename + # Persistent upload validation failure. Emitted once per + # (path, ui_sha) triple when repeated SHA-mismatch or chunked-upload + # failures cross the give-up threshold. CS-side suppress is the + # load-bearing defense: subsequent 1010s for the same triple are + # silent-dropped on CS regardless of UI behavior. UI MAY honor this + # code (suggested: backup & stop trying for that triple); current UI + # ignores unknown codes and keeps retrying — CS-side suppress still + # closes the storm. + VALIDATION_GIVEUP = 4 + + +# Lane classification for network requests. +# +# The state lane (the existing single-slot ``NetworkRequestHandler``, +# ``max_size=1``) serves registration-mode traffic, ``SET_DEVICE_STATE`` +# transitions, and ``SEND_TREATMENT_REPORT`` uploads. Strict ordering is +# preserved for every request type in that lane. +# +# The idempotent lane (``IdempotentRequestHandler``, ``max_size=16``) +# absorbs long-running, filename-keyed uploads that would otherwise +# starve the state lane during 10–30 minute transfers: +# +# - ``CS2DCS_REQ_SEND_DEVICE_LOG`` (up to ~350 MB) +# - ``CS2DCS_REQ_SEND_CS_LOG`` (up to ~300 MB on debug days) +# +# Both lanes run a single worker each: ``max_size`` is queue depth, not +# parallelism. Parallel upload of two filename-keyed logs serialises +# through the idempotent worker while the state lane continues serving +# state transitions without delay. +# +# Absence from this map → state lane (fail-safe). Any new request type +# added in the future therefore defaults to ordered delivery until an +# explicit decision to move it has been reviewed and tested. +REQUEST_TYPE_LANE = { + NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG: "idempotent", + NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG: "idempotent", +} + + +def lane_for_request_type(request_type) -> str: + """Return the lane name for *request_type*. Unknown types → 'state'.""" + return REQUEST_TYPE_LANE.get(request_type, "state") Index: cloudsync/config/config_STAGING.json =================================================================== diff -u --- cloudsync/config/config_STAGING.json (revision 0) +++ cloudsync/config/config_STAGING.json (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -0,0 +1,29 @@ +{ + "kebormed_paas": { + "idp_client_secret": "NL2cn6eMyg2WLSB0nhfvbxvM79dvo3ta", + "url_mft": "", + "url_dcs": "https://device-api.diality.staging.kebormed.com", + "url_device_identity": "https://device-identity.diality.staging.kebormed.com/auth/realms/Main/protocol/openid-connect/token", + "url_reachability": "https://healthcheck.diality.staging.kebormed.com/", + "dia_org_id": 1 + }, + "device": { + "ip": "", + "port": 80, + "name": "", + "hd_serial": "", + "dg_serial": "", + "sw_version": "", + "mode": "registration", + "device_state": "INACTIVE_NOT_OK" + }, + "logs": { + "default_log_level": "ERROR", + "default_log_level_duration": "86400000", + "current_log_level": "", + "log_level_duration": 0, + "log_level_start_timestamp": 0, + "log_level_stop_timestamp": 0, + "update_dcs_flag": 0 + } +} \ No newline at end of file Index: cloudsync/handlers/auth_failure_tracker.py =================================================================== diff -u --- cloudsync/handlers/auth_failure_tracker.py (revision 0) +++ cloudsync/handlers/auth_failure_tracker.py (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -0,0 +1,97 @@ +"""Persistent auth-failure escalation tracker. + +Aggregates repeated ``CS_GET_NEW_TOKEN_WITH_CERT_ERROR`` events into a +single operator-visible ``CS_AUTH_PERSISTENT_FAIL`` escalation. Uses a +sliding window over the monotonic clock so wall-clock jumps neither +spuriously escalate nor suppress escalation. + +Thresholds are in-memory only — on process restart, the counter resets. +If the underlying failure persists, escalation re-fires within ~N +seconds after boot, which is the desired operator signal (the issue +survives reboot). + +Wiring: ``NetworkRequestHandler._refresh_via_cert`` calls +:meth:`PersistentAuthFailureTracker.record_failure` after every +``cmd_outgoing_get_new_token_with_cert`` call that returns ``None``, then +checks :meth:`PersistentAuthFailureTracker.should_escalate` to decide +whether to enqueue the 934 error. +""" +from collections import deque +from threading import Lock +from time import monotonic + + +# Default thresholds: ~1 Hz of silent failures (observed under a +# Keycloak brute-force lockout) crosses 20-in-5-min easily, while normal +# intermittent-network conditions stay well below. +DEFAULT_WINDOW_S = 300 # 5 minutes sliding window +DEFAULT_THRESHOLD = 20 # N failures within the window +DEFAULT_COOLDOWN_S = 600 # 10 minutes between escalations + + +class PersistentAuthFailureTracker: + """Sliding-window counter for Keycloak token-fetch failures. + + Thread-safe via a ``Lock`` — ``record_failure`` may be called from any + handler thread, ``should_escalate`` is called from the same network + request thread(s). + + :param int window_s: Sliding window size in seconds (default 300). + :param int threshold: Failure count within the window that triggers + escalation (default 20). + :param int cooldown_s: Minimum seconds between two escalations + (default 600). Prevents spamming the operator when a persistent + failure keeps accumulating. + """ + + def __init__(self, window_s=DEFAULT_WINDOW_S, + threshold=DEFAULT_THRESHOLD, + cooldown_s=DEFAULT_COOLDOWN_S): + self._window_s = window_s + self._threshold = threshold + self._cooldown_s = cooldown_s + self._failures = deque() + self._last_escalation_ts = 0.0 + self._lock = Lock() + + def record_failure(self): + """Record one token-fetch failure at the current monotonic time.""" + with self._lock: + now = monotonic() + self._failures.append(now) + self._trim_locked(now) + + def should_escalate(self): + """Return True iff threshold reached AND cool-down elapsed. + + Returns True at most once per ``cooldown_s`` window regardless of + how many calls are made. When True is returned, the internal + counter is cleared so the next escalation requires a fresh batch + of ``threshold`` failures. + """ + with self._lock: + now = monotonic() + self._trim_locked(now) + if len(self._failures) < self._threshold: + return False + if now - self._last_escalation_ts < self._cooldown_s: + return False + self._last_escalation_ts = now + self._failures.clear() + return True + + def failure_count(self): + """Current failure count within the sliding window (diagnostic).""" + with self._lock: + now = monotonic() + self._trim_locked(now) + return len(self._failures) + + def _trim_locked(self, now): + """Drop entries older than ``window_s`` before ``now``. + + Caller must hold ``self._lock``. + """ + cutoff = now - self._window_s + while self._failures and self._failures[0] < cutoff: + self._failures.popleft() Index: cloudsync/handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u -re08d90177eb58725bc0288b7e5efc3fd56589df7 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision e08d90177eb58725bc0288b7e5efc3fd56589df7) +++ cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -1,9 +1,9 @@ """Handler of network requests between CloudSync, Device Registration Tool and Diality Cloud System""" -from time import time +from time import monotonic, time from logging import Logger -from threading import Event, Thread +from threading import Event, Lock, Thread from collections import deque from random import seed, randint @@ -18,7 +18,79 @@ from cloudsync.handlers.outgoing.handler_cs_to_dcs import * from cloudsync.handlers.incoming.handler_mft_to_cs import * + +# Persistent chunked-upload give-up tracker. When repeated chunked- +# upload failures (any ``accepted=False`` outcome) for the same device- +# log path cross the threshold, CS-side suppress drops subsequent +# upload attempts for that path silently for the suppress duration -- +# closes the storm regardless of whether the UI honors the new wire +# reason code. A one-shot ``2010,3,,0,`` is +# emitted at threshold cross, replacing the per-attempt rejection wire +# for that one attempt; subsequent attempts in the suppress window emit +# nothing on the wire. +# +# Sliding window uses ``monotonic()`` because the device test protocols +# can change wall-clock mid-session. +# +# Per-path granularity (no UI SHA on the wire at this layer): a chunk- +# upload failure is identified solely by the device-log path it tried +# to upload. +_F3_GIVEUP_THRESHOLD = 5 +_F3_GIVEUP_WINDOW_S = 60.0 +_F3_GIVEUP_SUPPRESS_S = 3600.0 +_f3_giveup_failures: dict = {} # path -> list[monotonic timestamps] +_f3_giveup_until: dict = {} # path -> monotonic deadline +_f3_giveup_lock = Lock() + + +def _f3_giveup_is_suppressed(path) -> bool: + """Return True iff ``path`` is currently in the chunked-upload + give-up suppress window. Lazy-expires the entry on read when the + deadline has passed, so no separate cleanup thread is needed.""" + with _f3_giveup_lock: + deadline = _f3_giveup_until.get(path) + if deadline is None: + return False + if monotonic() >= deadline: + _f3_giveup_until.pop(path, None) + return False + return True + + +def _f3_giveup_record_failure(path) -> bool: + """Record a chunked-upload failure for ``path`` and return True iff + this failure is the one that crosses the give-up threshold. + + Lazy-prunes timestamps older than the sliding window. On threshold + cross, enters the suppress window and clears the failure list so a + fresh threshold cross can occur after the suppress entry expires + (i.e. the upstream problem persists past 1 hour).""" + now = monotonic() + cutoff = now - _F3_GIVEUP_WINDOW_S + with _f3_giveup_lock: + deadline = _f3_giveup_until.get(path) + if deadline is not None and now < deadline: + return False + bucket = _f3_giveup_failures.setdefault(path, []) + bucket[:] = [t for t in bucket if t >= cutoff] + bucket.append(now) + if len(bucket) >= _F3_GIVEUP_THRESHOLD: + _f3_giveup_until[path] = now + _F3_GIVEUP_SUPPRESS_S + _f3_giveup_failures.pop(path, None) + return True + return False + + class NetworkRequestHandler: + # Single-slot ordering invariant. The state lane's max_size MUST + # remain 1 so ordering of SET_DEVICE_STATE transitions is preserved. + DEFAULT_MAX_SIZE = 1 + + # Idle-tick cadence. A lane with no arriving work must still register + # as alive to the watchdog. 60s is well below both state-lane + # max_idle_s=300 and idempotent-lane max_idle_s=1800. + IDLE_TICK_S = 60 + def __init__(self, logger: Logger, max_size, output_channel, reachability_provider, error_handler): self.logger = logger self.logconf = LoggingConfig() @@ -27,6 +99,15 @@ self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe + # Semantic-progress marker updated after each request completes. + # Default 0 signals "no work yet"; the watchdog tolerates this until + # the first tick. + self.last_progress_ts = 0 + # Persistent auth-failure tracker, invoked from _refresh_via_cert + # to aggregate repeated Keycloak fetch failures into a single + # operator-visible CS_AUTH_PERSISTENT_FAIL escalation. + from cloudsync.handlers.auth_failure_tracker import PersistentAuthFailureTracker + self.auth_failure_tracker = PersistentAuthFailureTracker() self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() @@ -37,13 +118,21 @@ :return: None """ while True: - flag = self.event.wait() + # Top-of-loop tick in monotonic units. An idle lane (especially + # the idempotent one after a restart) still counts as alive; + # wall-clock jumps don't trip the watchdog. + self.last_progress_ts = monotonic() + flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: while len(self.queue) > 0: req = self.queue.popleft() try: self.handle_request(req) finally: + # Mark progress after each request returns, success + # or failure. Monotonic clock so a wall-clock step + # does not poison the marker. + self.last_progress_ts = monotonic() clear_correlation_id() self.event.clear() @@ -146,13 +235,10 @@ base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] - token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) device_state_url = urllib.parse.urljoin(base_url, "/api/device") - access_token = self.get_valid_token(identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn) + access_token = self._acquire_token(identity_url, client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: @@ -165,10 +251,13 @@ device_state_json = json.dumps(device_state_json) + refresher = lambda: self._refresh_via_cert( + identity_url, client_secret, _cid, _dsn) response = cmd_outgoing_set_device_state(url=device_state_url, access_token=access_token, device_state_json=device_state_json, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -219,14 +308,11 @@ base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] - token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin(base_url, "api/device/validate") data_submission_url = urllib.parse.urljoin(base_url, "/api/device/data") - access_token = self.get_valid_token(identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn) + access_token = self._acquire_token(identity_url, client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is None: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, @@ -235,6 +321,9 @@ self.error_handler.enqueue_error(error=error) return + refresher = lambda: self._refresh_via_cert( + identity_url, client_secret, _cid, _dsn) + # Step #1 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, @@ -245,6 +334,7 @@ sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -279,6 +369,7 @@ response = cmd_outgoing_check_if_patient_with_emr_id_exists(access_token=access_token, url=patient_with_emr_id_exists_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) # Step #2b - If patient with emr_id doesn't exist, create temporary patient @@ -292,6 +383,7 @@ response = cmd_outgoing_create_temporary_patient(access_token=access_token, url=create_temporary_patient_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: return @@ -309,6 +401,7 @@ access_token=access_token, associate=True, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) # Step #4 - Send treatment report @@ -337,6 +430,7 @@ access_token=access_token, treatment_log=treatment_log_json, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -365,6 +459,7 @@ access_token=access_token, associate=False, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) except Exception as e: @@ -378,20 +473,19 @@ base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] - token_verification_url = urllib.parse.urljoin( - base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin( base_url, "api/device/validate") # Step #1 - get access token - access_token = self.get_valid_token(identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn) + access_token = self._acquire_token(identity_url, client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: + refresher = lambda: self._refresh_via_cert( + identity_url, client_secret, _cid, _dsn) + # Step #2 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, @@ -402,6 +496,7 @@ sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -427,33 +522,76 @@ if not os.path.exists(device_log_data['path']): self.logger.warning(f"Device log file no longer available: {device_log_data['path']}") + _fn = os.path.basename(device_log_data['path']) error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, - f"Device log file no longer available: {device_log_data['path']}") + format_upload_error_body( + _fn, + f"[Errno 2] No such file or directory: " + f"'{device_log_data['path']}'")) self.error_handler.enqueue_error(error=error) return + # Layer-1 give-up suppress. Once a path has crossed + # the persistent-chunked-upload-failure threshold, + # subsequent upload attempts for that path are + # silently dropped for the suppress window: no + # chunked upload is attempted, no wire signal is + # emitted, no Error 930 is raised. Closes the storm + # even if the UI keeps queuing 1010s. + if _f3_giveup_is_suppressed(device_log_data['path']): + self.logger.info( + "Silent-drop chunked upload for %s " + "(persistent-failure suppress active)", + device_log_data['path']) + return + device_log_json = helpers_construct_device_log_json(device_log_data) upload_result = cmd_outgoing_upload_file_in_chunks(base_url=base_url, access_token=access_token, file_json=device_log_json, error_handler=self.error_handler, log_file_origin='device', - token_refresher=lambda: self.get_valid_token( - identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn), + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if isinstance(upload_result, dict) and not upload_result.get("accepted"): - # Rejection (e.g. 409 duplicate) — send 2010 with 3 params: filename, 0, reason_code - message_body = "{0},3,{1},0,{2}".format( - OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, - upload_result["filename"], - upload_result["reason_code"]) - self.output_channel.enqueue_message(message_body) + # Persistent-failure tracker: record the chunked- + # upload failure and, on the threshold-cross + # attempt, replace the per-attempt rejection wire + # with a one-shot give-up wire and emit a + # diagnostic Error 930. Subsequent attempts for + # the same path are silent-dropped at the top of + # this branch for the suppress duration. + if _f3_giveup_record_failure(device_log_data['path']): + wire_msg = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + upload_result["filename"], + LogUploadReasonCode.VALIDATION_GIVEUP.value) + self.output_channel.enqueue_message(wire_msg) + giveup_err = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + format_upload_error_body( + upload_result["filename"], + "Persistent chunked-upload give-up: " + "{0} failures within {1}s; " + "suppressing further uploads for {2}s".format( + _F3_GIVEUP_THRESHOLD, + int(_F3_GIVEUP_WINDOW_S), + int(_F3_GIVEUP_SUPPRESS_S)))) + self.error_handler.enqueue_error(error=giveup_err) + else: + # Protocol-shaped rejection covering every terminal + # failure (409 Duplicate reason=3, connection/timeout/5xx + # reason=1 Disconnected, 401/403 reason=2 Credential). + # UI-Brain's existing rejection handlers engage on each. + message_body = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + upload_result["filename"], + upload_result["reason_code"]) + self.output_channel.enqueue_message(message_body) elif isinstance(upload_result, str): # Success — send 2010 with 3 params: filename, 1 (success), 0 (no error) self.logger.debug(f"Device log file uploaded: {upload_result}") @@ -462,36 +600,42 @@ upload_result) self.output_channel.enqueue_message(message_body) else: + _fn = os.path.basename(device_log_data.get('path', '')) if isinstance(device_log_data, dict) else '' error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, - "Missing access token") + format_upload_error_body(_fn, "Missing access token")) self.error_handler.enqueue_error(error=error) except Exception as e: + # device_log_data may not have been bound yet if exception + # fired before payload extraction; best-effort filename. + try: + _fn = os.path.basename(req.payload.get('path', '')) if isinstance(req.payload, dict) else '' + except Exception: + _fn = '' error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, - str(e)) + format_upload_error_body(_fn, str(e))) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG: try: cs_log_data = req.payload base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] - token_verification_url = urllib.parse.urljoin( - base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin( base_url, "api/device/validate") # Step #1 - get access token - access_token = self.get_valid_token(identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn) + access_token = self._acquire_token(identity_url, client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: + refresher = lambda: self._refresh_via_cert( + identity_url, client_secret, _cid, _dsn) + # Step #2 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, @@ -502,6 +646,7 @@ sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -535,11 +680,7 @@ file_json=cs_log_json, error_handler=self.error_handler, log_file_origin='cs', - token_refresher=lambda: self.get_valid_token( - identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn), + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if isinstance(cs_log_filename, dict) and not cs_log_filename.get("accepted"): @@ -558,54 +699,87 @@ pass else: + _fn = os.path.basename(cs_log_data.get('path', '')) if isinstance(cs_log_data, dict) else '' error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_ERROR.value, - "Missing access token") + format_upload_error_body(_fn, "Missing access token")) self.error_handler.enqueue_error(error=error) except Exception as e: + try: + _fn = os.path.basename(req.payload.get('path', '')) if isinstance(req.payload, dict) else '' + except Exception: + _fn = '' error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_ERROR.value, - str(e)) + format_upload_error_body(_fn, str(e))) self.error_handler.enqueue_error(error=error) else: g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) - def get_valid_token(self, identity_url, token_verification_url, client_secret, - correlation_id="", device_sn=""): - access_token = helpers_get_stored_token() + def _refresh_via_cert(self, identity_url, client_secret, + correlation_id="", device_sn=""): + """Shared cert-auth refresh path with persistent-failure tracking. + Fetches a fresh access token from Keycloak via device cert auth. + On failure (None return), increments the persistent-auth-failure + tracker and emits ``CS_AUTH_PERSISTENT_FAIL`` (934) when the + threshold trips (20 failures in 5 min). + + Used by ``_acquire_token`` (first-time token fetch) and by the + per-site retry loops in ``cmd_outgoing_*`` (reactive refresh on + 401/403). + + :return: Fresh access token on success, or None on cert-auth failure. + """ + access_token = cmd_outgoing_get_new_token_with_cert( + path_certificate=CREDENTIALS_CERTIFICATE_X509, + path_private_key=CREDENTIALS_PRIVATE_KEY, + save=True, + url=identity_url, + client_secret=client_secret, + error_handler=self.error_handler, + correlation_id=correlation_id, + device_sn=device_sn, + ) + if access_token is None: - self.logger.info("No stored token found, requesting new token") - access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, - path_private_key=CREDENTIALS_PRIVATE_KEY, - save=True, - url=identity_url, - client_secret=client_secret, - error_handler=self.error_handler, - correlation_id=correlation_id, - device_sn=device_sn) - else: - response = cmd_outgoing_verify_token(url=token_verification_url, - access_token=access_token, - error_handler=self.error_handler, - correlation_id=correlation_id, - device_sn=device_sn) - if response is None or response.status_code != OK: - self.logger.warning(f"Token verification failed (status={response.status_code if response else 'None'}), refreshing token") - access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, - path_private_key=CREDENTIALS_PRIVATE_KEY, - save=True, - url=identity_url, - client_secret=client_secret, - error_handler=self.error_handler, - correlation_id=correlation_id, - device_sn=device_sn) - else: - self.logger.info("Token verification succeeded") + self.auth_failure_tracker.record_failure() + if self.auth_failure_tracker.should_escalate(): + escalation = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_AUTH_PERSISTENT_FAIL.value, + "Cloud authentication failing persistently " + "(>= 20 Keycloak fetch failures in 5 min). " + "Check identity server reachability, device certificate " + "validity, and Keycloak client/role configuration. " + "Recent CS_GET_NEW_TOKEN_WITH_CERT_ERROR entries carry " + "the specific Keycloak response body.") + self.error_handler.enqueue_error(error=escalation) + self.logger.error( + "Persistent auth failure escalated to UI-Brain " + "(CS_AUTH_PERSISTENT_FAIL)") return access_token + def _acquire_token(self, identity_url, client_secret, + correlation_id="", device_sn=""): + """Get an access token for a DCS request. + + Returns the stored token if present (server decides validity via + JwtBearer). If the stored token is missing/corrupt, fetches a + fresh one via cert auth through ``_refresh_via_cert`` (which + records persistent failures). May return None on persistent + cert-auth failure; callers must treat None as 'cannot proceed + with this request'. + """ + tok = helpers_get_stored_token() + if tok is not None: + return tok + self.logger.info("No stored token found, requesting new token") + return self._refresh_via_cert(identity_url, client_secret, + correlation_id, device_sn) + def wait_for_network(self, wait_time): counter = 0 while not self.reachability_provider.reachability and counter < wait_time: @@ -615,3 +789,33 @@ return False else: return True + + +class IdempotentRequestHandler(NetworkRequestHandler): + """Idempotent lane for long-running, filename-keyed uploads. + + Structurally identical to :class:`NetworkRequestHandler` — inherits + ``scheduler`` / ``enqueue_request`` / ``handle_request``. The only + operational difference is the default queue depth (16 vs 1), which + lets large log uploads absorb backlog without starving the state + lane. + + Scope (see ``REQUEST_TYPE_LANE`` in ``cloudsync.common.enums``): + - ``CS2DCS_REQ_SEND_DEVICE_LOG`` (up to ~350 MB) + - ``CS2DCS_REQ_SEND_CS_LOG`` (up to ~300 MB) + + The class exists for type discrimination (watchdog naming, tests, + log clarity). Do not instantiate for state-ordering-critical work. + """ + + DEFAULT_MAX_SIZE = 16 + + def __init__(self, logger: Logger, max_size=None, output_channel=None, + reachability_provider=None, error_handler=None): + if max_size is None: + max_size = self.DEFAULT_MAX_SIZE + super().__init__(logger=logger, max_size=max_size, + output_channel=output_channel, + reachability_provider=reachability_provider, + error_handler=error_handler) + self.logger.info('Created Idempotent Request Handler (max_size=%d)', max_size) Index: cloudsync/handlers/logs_handler.py =================================================================== diff -u -re08d90177eb58725bc0288b7e5efc3fd56589df7 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision e08d90177eb58725bc0288b7e5efc3fd56589df7) +++ cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -1,110 +1,142 @@ -from cloudsync.handlers.error import Error +"""Custom rotating log handler for CloudSync. + +Upload is decoupled from rotation: the actual CS-log upload is driven +by ``HeartBeatProvider`` on its 20-second tick when the idempotent +lane is idle. This class's sole responsibility is rotation. + +Clock-jump tolerant rotation. ``logging.handlers.TimedRotatingFileHandler`` +computes ``rolloverAt`` from the REAL wall-clock at boot and never +revisits that decision unless a rollover actually fires. On devices +where operators simulate day transitions by setting the system clock +to 23:59 and waiting, the stdlib ``rolloverAt`` stays anchored to the +real boot time and never matches the simulated time, so rotations are +silently skipped (confirmed against CPython 3.11). + +This handler replaces the stdlib trigger with a UTC-date-equality +check: rotation fires whenever the current UTC date differs from the +date on which the active log file was opened. Any clock jump — +forward, backward, or past multiple midnight boundaries — is honored +at the very next log emit. Collision safety: if the rotated-file +suffix already exists on disk (operator set clock backward then +forward again across the same date), a numeric counter is appended +so no prior rotated file is silently overwritten. +""" + +from logging.handlers import TimedRotatingFileHandler + from cloudsync.utils.helpers import * from cloudsync.utils.globals import * -from logging.handlers import TimedRotatingFileHandler -from time import time, sleep -from threading import Thread +# Re-import the datetime and time MODULES last, aliased. cloudsync.utils.helpers +# does ``from datetime import *`` (rebinding "datetime" to the +# ``datetime.datetime`` class) and ``from time import time, sleep`` +# (rebinding "time" to the ``time.time`` function). Star-imports above +# would therefore shadow our module references; aliasing keeps the +# module accessors unambiguous. +import datetime as _dt_module +import time as _time_module + class CustomTimedRotatingFileHandlerHandler(TimedRotatingFileHandler): """ - Handler for logging to a file, rotating the log file and uploading it to cloud at certain timed intervals. - It extends the official TimedRotatingFileHandler to add the upload functionality. + Handler for logging to a file, rotating the log file at timed intervals. + + The ``set_network_provider`` / ``set_error_provider`` / ``set_configuration`` + setters are retained as pass-through no-ops because ``LoggingConfig`` + still invokes them from ``cloud_sync.py``. """ - _upload_in_progress = False - def __init__(self, filename, prelogger: Logger, *args, **kwargs): super().__init__(filename, *args, **kwargs) self.network_request_handler = None self.error_handler = None self.g_config = None + # Track the UTC date of the currently-active file. Rotation + # decisions compare this against datetime.now(utc).date() on every + # emit, so any clock jump is caught immediately regardless of the + # stdlib's boot-locked rolloverAt. + self._active_log_date = self._current_utc_date() prelogger.info("CUSTOM LOG HANDLER INITIATED") - def doRollover(self): - super().doRollover() - self.__select_files_to_upload() + @staticmethod + def _current_utc_date(): + return _dt_module.datetime.now(_dt_module.timezone.utc).date() - def set_network_provider(self, network_request_handler): - self.network_request_handler = network_request_handler + def shouldRollover(self, record): + """Rotate whenever the UTC date changes, ignoring self.rolloverAt + (which is a boot-time fixed value that does not adapt to + simulated clock jumps).""" + try: + return self._current_utc_date() != self._active_log_date + except Exception: + # Defensive: any failure here must not break logging itself. + # Fall back to the stdlib check so at least real midnights + # still rotate. + try: + return super().shouldRollover(record) + except Exception: + return False - def set_error_provider(self, error_handler): - self.error_handler = error_handler + def doRollover(self): + """Rotate using the current simulated time, not self.rolloverAt. - def set_configuration(self, g_config): - self.g_config = g_config - - def __select_files_to_upload(cls): + Computes the rotated-file suffix from the UTC date of the log + being rotated out (the data's original date). Guarantees no + silent overwrite of a pre-existing file with the same suffix + (collisions happen when operators set the clock backward then + forward across the same date boundary). """ - Checks for existing rotated files in the directory (excluding the newly created one) and uploads them. - Spawns a background thread to upload files sequentially, waiting for each to complete - before starting the next. This prevents queue saturation when multiple backlogged - files exist (e.g. after extended offline periods). - """ - if cls._upload_in_progress: - return # previous batch still draining + if self.stream: + self.stream.close() + self.stream = None - # We use the cls.baseFilename because according to code documentation: - # the filename passed in, could be a path object (see Issue #27493), - # but cls.baseFilename will be a string + current_time = int(_time_module.time()) + # The active log was accumulating since self._active_log_date. + # Use that as the suffix so the rotated filename reflects the + # data's date, not the wall-clock moment of rotation. + suffix_date = self._active_log_date + suffix_str = suffix_date.strftime(self.suffix) + dfn = self.rotation_filename(self.baseFilename + "." + suffix_str) - existing_files = [] - base_name = os.path.basename(cls.baseFilename) - for filename in os.listdir(os.path.dirname(cls.baseFilename)): - if filename.startswith(base_name) and filename != base_name: - existing_files.append(os.path.join(os.path.dirname(cls.baseFilename), filename)) + # Collision safety: preserve any pre-existing rotated file with + # the same suffix (operator clock-jumped across a date that was + # already rotated earlier). + base_dfn = dfn + counter = 1 + while os.path.exists(dfn): + dfn = "{0}.{1}".format(base_dfn, counter) + counter += 1 - if len(existing_files) > 0: - existing_files.sort(key=lambda x: os.path.getmtime(x)) # oldest first - cls._upload_in_progress = True - Thread(target=cls.__upload_files_sequentially, - args=(existing_files,), daemon=True).start() - - def __upload_files_sequentially(cls, files): - """ - Uploads CS log files one at a time, waiting for the network queue to drain - between each upload. This ensures all backlogged files are uploaded without - saturating the single-slot network queue. - """ try: - for f in files: - if not os.path.exists(f): - continue # deleted by log retention or previous upload - cls.__upload_cs_log_file(f) - # Wait for network queue to drain before next file - while len(cls.network_request_handler.queue) > 0: - sleep(1) - sleep(2) # yield to bus-initiated requests - finally: - cls._upload_in_progress = False + self.rotate(self.baseFilename, dfn) + except Exception: + # If rotation fails (e.g., source missing), don't leave the + # handler in a half-closed state - reopen and let the next + # emit retry. + if not self.delay: + self.stream = self._open() + raise - def __upload_cs_log_file(cls, log_file_path): + if not self.delay: + self.stream = self._open() - cs_log_data = { - "path": log_file_path, - "serialNumber": cls.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], - "checksum": helpers_sha256_checksum(log_file_path) - } + # Update the active-date marker to the NEW current UTC date. + # The very next emit compares against this and knows not to + # re-rotate until the UTC date changes again. + self._active_log_date = self._current_utc_date() - g_utils.logger.debug(f"CS log data: {cs_log_data}") - + # Keep stdlib's internal bookkeeping consistent, though we do + # not rely on rolloverAt for rotation decisions. try: - # Generate synthetic correlation ID for CS log uploads. - # CS logs are triggered by timer (log rotation), not bus messages, - # so there is no originating message to derive a correlation from. - serial = cls.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] - correlation_id = f"cs-{serial}-cslog-0-{str(round(time() * 1000))}" + self.rolloverAt = self.computeRollover(current_time) + except Exception: + pass - helpers_add_to_network_queue(network_request_handler=cls.network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG, - url='', - payload=cs_log_data, - method='', - g_config=cls.g_config, - correlation_id=correlation_id, - success_message='CS2DCS_REQ_SEND_CS_LOG request added to network queue') - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_LOG_ERROR.value, - str(e)) - cls.error_handler.enqueue_error(error=error) \ No newline at end of file + def set_network_provider(self, network_request_handler): + self.network_request_handler = network_request_handler + + def set_error_provider(self, error_handler): + self.error_handler = error_handler + + def set_configuration(self, g_config): + self.g_config = g_config Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u -ra19403bb936dd50d786e22fd52105f586a93f252 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision a19403bb936dd50d786e22fd52105f586a93f252) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -1,6 +1,7 @@ """Handler of commands sent by CloudSync app to the Diality Cloud System""" from typing import List, Tuple +from time import monotonic import json import requests import urllib.parse @@ -12,6 +13,59 @@ from cloudsync.handlers.error import Error +def _upload_rejection(filename: str, reason_code: int) -> dict: + """Build a terminal-failure rejection dict for an upload call. + + Returned instead of ``None`` whenever the upload cannot be completed. + The caller (cs_mft_dcs_request_handler upload dispatch) translates this + into a protocol-shaped ``2010,3,,0,`` message + on the UI output bus so UI-Brain's existing rejection handlers + (code 1 Disconnected → backup & retry, 2 Credential → backup & no + upload, 3 Duplicate → backup & rename) engage automatically. + """ + return {"accepted": False, "filename": filename, "reason_code": reason_code} + + +def format_upload_error_body(filename: str, body: str) -> str: + """Prefix the device-log filename onto an Error 930 message body so + a single ``grep`` on a CS log can correlate every upload-pipeline + error with its file. Empty/missing filename collapses to + ```` so the prefix shape is stable across all callers. + + Every upload-pipeline emit site (CS_DEVICE_LOG_ERROR / + CS_LOG_ERROR) MUST go through this helper; an AST-walk audit test + in the test suite enforces this contract. + """ + fn = filename if filename else "unknown" + return f"<{fn}> {body}" + + +def _classify_http_reason(status_code: int) -> int: + """Map an HTTP status to a LogUploadReasonCode. + + 401 / 403 → CREDENTIAL (reason 2). Genuine auth failures require + operator attention; UI-Brain backs up without retry. + + Everything else (timeouts, 5xx, connection resets, DNS, TLS) → + DISCONNECTED (reason 1). Safest default; UI-Brain backs up and + retries later. + """ + if status_code in (401, 403): + return LogUploadReasonCode.CREDENTIAL.value + return LogUploadReasonCode.DISCONNECTED.value + + +def _should_retry_with_refresh(resp) -> bool: + """Retry predicate for reactive cert-auth refresh. + + Returns True iff the response is an HTTP 401 or 403 — the only + status codes that indicate a stale/invalid token. 409 (duplicate), + 5xx (server side), and network errors (resp is None) all propagate + unchanged; no token refresh helps those. + """ + return resp is not None and resp.status_code in (401, 403) + + def _inject_tracing_headers(headers: dict, correlation_id: str = "", device_sn: str = "") -> dict: """Add cross-system tracing headers. Non-functional: unknown headers are ignored by DCS.""" if correlation_id: @@ -21,6 +75,69 @@ return headers +def _build_dcs_headers(access_token: str, + correlation_id: str = "", + device_sn: str = "") -> dict: + """Standard header dict for any DCS request carrying a Bearer token. + + Returns a fresh dict (safe to mutate per-call) with tracing headers + already injected. Used by every ``cmd_outgoing_*`` that hits DCS. + """ + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION, + } + _inject_tracing_headers(headers, correlation_id, device_sn) + return headers + + +def _call_with_retry(build_request: callable, + access_token: str, + token_refresher: callable, + error_handler: ErrorHandler, + error_id: int): + """Single-retry-on-401/403 wrapper shared by every DCS call. + + ``build_request(access_token)`` is a caller-supplied closure that + performs the HTTP call and returns a ``requests.Response``. The + wrapper invokes it up to twice: on the first 401/403 it calls + ``token_refresher()`` and retries with the refreshed token. 409, + 5xx, and network errors propagate unchanged. Timeouts, redirect + loops, and unexpected exceptions are translated into the + caller-provided ``error_id`` and enqueue a standard Error before + returning None. + """ + resp = None + for attempt in (1, 2): + try: + resp = build_request(access_token) + except requests.exceptions.Timeout: + error_handler.enqueue_error( + error=Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + error_id, "DCS request timeout")) + return None + except requests.exceptions.TooManyRedirects: + error_handler.enqueue_error( + error=Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, error_id)) + return None + except Exception as e: + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, error_id, str(e))) + return None + + if _should_retry_with_refresh(resp) and attempt == 1 and token_refresher is not None: + refreshed = token_refresher() + if refreshed is None: + break + access_token = refreshed + continue + break + + return resp + + def _log_dcs_trace_id(response, correlation_id: str = ""): """Log DCS Jaeger traceId from response headers on non-2xx responses.""" if response is None: @@ -70,23 +187,61 @@ g_utils.logger.debug("Making request: {0}, {1}, {2}, {3}".format(url, headers, payload, cert_paths)) + # Split connect/read timeout. The 5s default was too tight for + # the read phase under field conditions; keep connect tight so + # we fail fast on an unreachable identity server. response = requests.post(url=url, headers=headers, data=payload, cert=cert_paths, - timeout=5) + timeout=(5, 30)) - data = response.json() + # Surface Keycloak failures. A non-2xx response (or a 2xx response + # that lacks "access_token") would otherwise return None without + # enqueuing an error, producing thousands of downstream "Missing + # access token" errors with no diagnostic trail. + if response.status_code != 200: + body_preview = (response.text or "")[:200] + error = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + "Keycloak non-2xx; status={0}, body={1!r}".format( + response.status_code, body_preview)) + error_handler.enqueue_error(error=error) + return None + try: + data = response.json() + except ValueError: + body_preview = (response.text or "")[:200] + error = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + "Keycloak 200 but non-JSON response; body={0!r}".format( + body_preview)) + error_handler.enqueue_error(error=error) + return None + g_utils.logger.debug("Keycloak response: {0}".format(data)) + token = data.get("access_token", None) + if token is None: + body_preview = (response.text or "")[:200] + error = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + "Keycloak 200 but no access_token key; body={0!r}".format( + body_preview)) + error_handler.enqueue_error(error=error) + return None + if save: if not os.path.exists(TOKEN_CACHING_PATH): os.makedirs(TOKEN_CACHING_PATH) with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'w') as f: f.write(json.dumps(data, indent=4)) - return data.get("access_token", None) + return token except requests.exceptions.Timeout: error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, @@ -107,114 +262,50 @@ @log_func -def cmd_outgoing_verify_token(url: str, - access_token: str, - error_handler: ErrorHandler, - correlation_id: str = "", - device_sn: str = "") -> requests.Response: - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - "X-OrganizationId": '1', - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - - data = { - "hdSerialNumber": "token-validation", - "dgSerialNumber": "token-validation", - "softwareVersion": "token-validation" - } - - resp = requests.post(url=url, - data=json.dumps(data), - headers=headers, - timeout=(30, 60)) - g_utils.logger.info(f"Token verification response: {resp.status_code}") - return resp - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None - - -@log_func def cmd_outgoing_validate_device(access_token: str, hd_serial_number: str, dg_serial_number: str, sw_version: str, url: str, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> dict: """ Step 8. Validate device Step 9. Validate device response (list of invalid fields) :return: The json response """ - try: - payload = json.dumps({ - "hdSerialNumber": hd_serial_number, - "dgSerialNumber": dg_serial_number, - "softwareVersion": sw_version - }) + payload = json.dumps({ + "hdSerialNumber": hd_serial_number, + "dgSerialNumber": dg_serial_number, + "softwareVersion": sw_version + }) - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + def _do_call(token): + return requests.post(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + data=payload, + timeout=(30, 60)) - response = requests.post(url=url, - headers=headers, - data=payload, - timeout=(30, 60)) - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) - error_handler.enqueue_error(error=error) + response = _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) + if response is None: return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None try: return response.json() except json.decoder.JSONDecodeError as e: - error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - response.status_code, response.reason, str(e)) - error_handler.enqueue_error(error=error) + error_handler.enqueue_error( + error=Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + response.status_code, response.reason, str(e))) return None except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, str(e))) return None @@ -225,6 +316,7 @@ access_token: str, device_state_json: dict, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ @@ -233,142 +325,70 @@ :param device_state_json: device state payload :param access_token: access token :param error_handler: global error handler + :param token_refresher: Optional callable returning a fresh access + token (or None on failure) when a 401/403 is received. Set to + None by legacy callers; bounded to a single retry per call. :return: The response """ - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - payload = device_state_json - resp = requests.put(url=url, - headers=headers, - data=payload, + def _do_call(token): + return requests.put(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + data=device_state_json, timeout=(30, 60)) - return resp - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None + return _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) + @log_func def cmd_outgoing_check_if_patient_with_emr_id_exists(access_token: str, url: str, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + def _do_call(token): + return requests.get(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + timeout=(30, 60)) - response = requests.get(url=url, - headers=headers, - timeout=(30, 60)) - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None + return _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) - try: - return response - except json.decoder.JSONDecodeError as e: - error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - response.status_code, response.reason, str(e)) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None - @log_func def cmd_outgoing_create_temporary_patient(access_token: str, url: str, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = ""): - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + def _do_call(token): + return requests.post(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + data={}, + timeout=(30, 60)) - payload = {} - - response = requests.post(url=url, - headers=headers, - data=payload, - timeout=(30, 60)) - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) - error_handler.enqueue_error(error=error) + response = _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) + if response is None: return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None try: return response.json() except json.decoder.JSONDecodeError as e: - error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - response.status_code, response.reason, str(e)) - error_handler.enqueue_error(error=error) + error_handler.enqueue_error( + error=Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, + response.status_code, response.reason, str(e))) return None except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, str(e))) return None @@ -377,6 +397,7 @@ access_token: str, associate: bool, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ @@ -385,58 +406,34 @@ :param access_token: access token :param associate: status of the device & patient association :param error_handler: global error handler + :param token_refresher: Optional callable returning a fresh access + token (or None) on 401/403; wraps the whole HEAD+PUT or DELETE + branch so a single refresh re-runs the full sub-request chain. :return: The response """ - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - payload = {} - + def _do_call(token): + headers = _build_dcs_headers(token, correlation_id, device_sn) if associate: - resp = requests.head(url=urllib.parse.urljoin(url, "/exists"), - headers=headers, - data=payload, + r = requests.head(url=urllib.parse.urljoin(url, "/exists"), + headers=headers, data={}, timeout=(30, 60)) + if r.status_code == NOT_FOUND: + r = requests.put(url=url, headers=headers, data={}, timeout=(30, 60)) + return r + return requests.delete(url=url, headers=headers, data={}, + timeout=(30, 60)) - if resp.status_code == NOT_FOUND: - resp = requests.put(url=url, - headers=headers, - data=payload, - timeout=(30, 60)) - else: - resp = requests.delete(url=url, - headers=headers, - data=payload, - timeout=(30, 60)) - return resp - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None + return _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) @log_func def cmd_outgoing_send_treatment_report(url: str, access_token: str, treatment_log: str, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ @@ -445,41 +442,20 @@ :param access_token: access token :param treatment_log: treatment report sent to DCS :param error_handler: global error handler + :param token_refresher: Optional callable returning a fresh access + token (or None) on 401/403; bounded to a single retry. :return: The response """ - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - - payload = treatment_log - - resp = requests.post(url=url, - headers=headers, - data=payload, + def _do_call(token): + return requests.post(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + data=treatment_log, timeout=(30, 60)) - return resp - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None + return _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) + @log_func def cmd_outgoing_check_data_log_exists(base_url: str, access_token: str, @@ -545,57 +521,117 @@ origins = ("cs", "device") if log_file_origin not in origins: g_utils.logger.error(f"Wrong log file origin provided.") + # Programmer-error path; no filename available — safe to return None. return None ERROR_ID = ErrorIDs.CS_DEVICE_LOG_ERROR.value if log_file_origin == 'device' else ErrorIDs.CS_LOG_ERROR.value + # Extract filename early so every rejection path can surface it to + # UI-Brain via the protocol-shaped 2010 message. + try: + _filename = str(file_json['start_session']['metadata']['deviceFileName']) + except (KeyError, TypeError): + _filename = "" + + # Single structured upload-outcome line at every function exit so an + # operator can grep one CS-log line per upload (success or failure) + # instead of correlating across many INFO/ERROR lines. State dict is + # populated incrementally as the upload progresses; the closures + # below capture it by reference and emit on every return path. + _summary = { + "file_size": None, + "num_chunks": 0, + "chunks_succeeded": 0, + "start_time": monotonic(), + "session_id": None, + "sha256": "", + } + + def _emit_failed(reason: str) -> None: + duration_s = int(monotonic() - _summary["start_time"]) + g_utils.logger.error( + f"upload_failed file={_filename} size={_summary['file_size']} " + f"chunks={_summary['chunks_succeeded']}/{_summary['num_chunks']} " + f"duration_s={duration_s} session={_summary['session_id'] or 'none'} " + f"reason='{reason}'" + ) + + def _emit_complete() -> None: + duration_s = int(monotonic() - _summary["start_time"]) + g_utils.logger.info( + f"upload_complete file={_filename} size={_summary['file_size']} " + f"chunks={_summary['num_chunks']}/{_summary['num_chunks']} " + f"duration_s={duration_s} session={_summary['session_id']} " + f"sha256={_summary['sha256']}" + ) + # # Start upload session # + # Reactive retry on 401/403 (server authority): DCS's JwtBearer + # middleware decides when a refresh is needed; the client only + # refreshes in response to an explicit auth-class status code. start_session_url = base_url.rstrip("/") + "/api/device/data/start-session" - start_session_payload = file_json['start_session'] - start_session_payload = json.dumps(start_session_payload) - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + start_session_payload = json.dumps(file_json['start_session']) g_utils.logger.info(f"Starting upload session for {log_file_origin} log") g_utils.logger.debug(f"File upload payload (start-session): {start_session_payload}") - try: - response = requests.post( - url=start_session_url, - headers=headers, - data=start_session_payload, - timeout=(30, 60)) + # Inline retry (not _call_with_retry): the chunk loop and end-session + # below both use the post-retry access_token, so the wrapper needs to + # rebind the local. Helper returns only the response. + response = None + for attempt in (1, 2): + try: + response = requests.post( + url=start_session_url, + headers=_build_dcs_headers(access_token, correlation_id, device_sn), + data=start_session_payload, + timeout=(30, 60)) + g_utils.logger.info(f"Start-session response: {response.status_code}") + except Exception as e: + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body(_filename, f"Start-session failed: {e}"))) + _emit_failed(f"start-session connection error: {e}") + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) - g_utils.logger.info(f"Start-session response: {response.status_code}") + if _should_retry_with_refresh(response) and attempt == 1 and token_refresher is not None: + refreshed = token_refresher() + if refreshed is None: + break + access_token = refreshed + continue + break - if response.status_code == CONFLICT: - device_file_name = str(file_json['start_session']['metadata']['deviceFileName']) - g_utils.logger.info(f"File {device_file_name} rejected as duplicate at start-session (409 Conflict).") - return {"accepted": False, "filename": device_file_name, "reason_code": LogUploadReasonCode.DUPLICATE.value} + if response.status_code == CONFLICT: + g_utils.logger.info(f"File {_filename} rejected as duplicate at start-session (409 Conflict).") + _emit_failed("start-session 409 duplicate") + return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) - if response.status_code != 200: - _log_dcs_trace_id(response, correlation_id) - g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") - raise Exception(f"Error while starting upload session: {response.status_code} - {response.text}") - - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + if response.status_code != 200: + _log_dcs_trace_id(response, correlation_id) + g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") + # Map auth-class status codes to CREDENTIAL; everything + # else terminal → DISCONNECTED. + reason = _classify_http_reason(response.status_code) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body( + _filename, + f"Start-session failed: {response.status_code} - {response.text[:500]}")) error_handler.enqueue_error(error=error) - return None + _emit_failed(f"start-session {response.status_code}") + return _upload_rejection(_filename, reason) session_id = response.json().get("sessionId") if not session_id: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, "Missing session ID in response.") + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body(_filename, "Start-session response missing sessionId")) error_handler.enqueue_error(error=error) - return None + _emit_failed("missing sessionId") + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) + _summary["session_id"] = session_id # # Send file in chunks @@ -605,138 +641,226 @@ target_file = file_json['general']["file_path"] file_size = file_json['general']["file_size"] upload_chunk_url = base_url.rstrip("/") + "/api/device/data/chunk" - upload_chunk_payload = file_json['upload_chunk'] - upload_chunk_payload['sessionId'] = session_id - upload_chunk_payload['chunkType'] = "device-data" - chunk_number = 1 - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + upload_chunk_payload_template = file_json['upload_chunk'] + upload_chunk_payload_template['sessionId'] = session_id + upload_chunk_payload_template['chunkType'] = "device-data" + # chunkNo is derived from a 1-indexed loop counter (chunk_idx), + # NOT from a state variable that tracks success. Defensive + # against any future refactor that routes "advance to next + # chunk after failure" through this code, which would silently + # send the same chunkNo with different bytes. + chunk_idx = 0 + headers = _build_dcs_headers(access_token, correlation_id, device_sn) - with open(target_file, "rb") as f: - file_content = f.read() + # Stream the file in raw chunks aligned to a 3-byte boundary and + # base64-encode each chunk independently. Peak RAM per upload: + # one chunk (~2 MB) instead of 7/3 × file_size (a non-streaming + # path keeps both the full file bytes and the full base64 string + # resident, which on ~300 MB CS logs pushed 2 GB devices into + # OOM-kill). Raw-chunk size MUST be a multiple of 3 so each + # chunk's base64 output concatenates to the same string as + # base64(whole file) — preserving server-side reassembly + # semantics unchanged. + raw_chunk_size = (chunk_size // 4) * 3 + if raw_chunk_size < 3: + raw_chunk_size = 3 # degenerate safeguard + # total_base64_size and num_chunks are derived arithmetically + # from file_size so we can report progress ("chunk K of N") the + # without requiring a pre-scan. + if file_size is None: + file_size = os.path.getsize(target_file) + total_base64_size = 4 * ((file_size + 2) // 3) + num_chunks = total_base64_size // chunk_size + (total_base64_size % chunk_size > 0) + if num_chunks == 0: + num_chunks = 1 # empty/tiny file still goes through once + _summary["file_size"] = file_size + _summary["num_chunks"] = num_chunks - # Encode the bytes using base64 - base64_string = base64.b64encode(file_content).decode("utf8") - - # Get the total size of the base64 string (in bytes) - total_size = len(base64_string) - - # Calculate the number of chunks - num_chunks = total_size // chunk_size + (total_size % chunk_size > 0) - all_chunks_ok = True - for i in range(num_chunks): - start_index = i * chunk_size - end_index = min(start_index + chunk_size, total_size) - chunk = base64_string[start_index:end_index] + with open(target_file, "rb") as f: + while True: + raw_block = f.read(raw_chunk_size) + if not raw_block: + break + chunk_idx += 1 + chunk = base64.b64encode(raw_block).decode("utf8") - # Retry logic with counter and backoff time - chunk_uploaded = False - retry_count = 0 - while retry_count < retries: - try: - if type(upload_chunk_payload) is str: - upload_chunk_payload = json.loads(upload_chunk_payload) + # Retry logic with counter and backoff time. + # auth_refresh_attempted is per-chunk: token refresh fires at + # most once per chunk across all retries. A still-stale token + # after refresh falls through to regular retry/abort instead + # of looping refresh forever. + chunk_uploaded = False + retry_count = 0 + auth_refresh_attempted = False + while retry_count < retries: + try: + # Fresh dict per attempt so we don't carry the + # prior chunk's state forward across retries. + upload_chunk_payload = dict(upload_chunk_payload_template) + upload_chunk_payload['chunkNo'] = chunk_idx + upload_chunk_payload['data'] = chunk + upload_chunk_body = json.dumps(upload_chunk_payload) - upload_chunk_payload['chunkNo'] = chunk_number - upload_chunk_payload['data'] = chunk - upload_chunk_payload = json.dumps(upload_chunk_payload) + g_utils.logger.debug( + f"File upload payload (upload-chunk) - chunk No {chunk_idx}: " + f"bytes={len(upload_chunk_body)}") - g_utils.logger.debug(f"File upload payload (upload-chunk) - chunk No {chunk_number}: {upload_chunk_payload}") + response = requests.post(upload_chunk_url, + headers=headers, + data=upload_chunk_body, + timeout=(30, 60)) - response = requests.post(upload_chunk_url, - headers=headers, - data=upload_chunk_payload, - timeout=(30, 60)) + if response.status_code == 200: + g_utils.logger.info(f"Uploaded chunk {chunk_idx} of {num_chunks}") + chunk_uploaded = True + _summary["chunks_succeeded"] += 1 + break # Successful upload, break retry loop - if response.status_code == 200: - g_utils.logger.info(f"Uploaded chunk {chunk_number} of {num_chunks}") - chunk_number += 1 - chunk_uploaded = True - break # Successful upload, break retry loop + # Reactive auth-class refresh. On 401/403 ask + # token_refresher for a fresh token, rebind the + # Authorization header, and retry this chunk + # WITHOUT consuming a backoff slot. Token validity + # is server-authoritative; never decode + # access_token.exp or compare TTL to a wall-clock + # value -- device test protocols change the system + # clock to simulate midnight cycles, which corrupts + # any wall-clock-based expiry check. + if (_should_retry_with_refresh(response) + and not auth_refresh_attempted + and token_refresher is not None): + auth_refresh_attempted = True + refreshed = token_refresher() + if refreshed is not None: + access_token = refreshed + headers = _build_dcs_headers( + access_token, correlation_id, device_sn) + g_utils.logger.info( + f"Chunk {chunk_idx}/{num_chunks} got " + f"{response.status_code}; refreshed token " + f"and retrying immediately") + continue # auth correction, not a backoff retry - g_utils.logger.warning(f"Chunk {chunk_number}/{num_chunks} upload failed: {response.status_code} - {response.text[:500]}") - retry_count += 1 - if retry_count < retries: - g_utils.logger.info(f"Retrying chunk upload in 5 seconds (attempt {retry_count}/{retries})...") - sleep(5) + g_utils.logger.warning( + f"Chunk {chunk_idx}/{num_chunks} got " + f"{response.status_code} but token refresh " + f"failed; falling through to retry/abort") + error_handler.enqueue_error( + error=Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body( + _filename, + f"Token refresh failed during chunk " + f"{chunk_idx}/{num_chunks} upload"))) - except Exception as e: - retry_count += 1 - g_utils.logger.error(f"Chunk {chunk_number}/{num_chunks} exception (attempt {retry_count}/{retries}): {e}") - if retry_count < retries: - sleep(5) + g_utils.logger.warning( + f"Chunk {chunk_idx}/{num_chunks} upload failed: " + f"{response.status_code} - {response.text[:500]}") + retry_count += 1 + if retry_count < retries: + g_utils.logger.info( + f"Retrying chunk upload in 5 seconds " + f"(attempt {retry_count}/{retries})...") + sleep(5) - if not chunk_uploaded: - g_utils.logger.error(f"Chunk {chunk_number} failed after {retries} retries, aborting upload") - all_chunks_ok = False - break + except Exception as e: + retry_count += 1 + g_utils.logger.error( + f"Chunk {chunk_idx}/{num_chunks} exception " + f"(attempt {retry_count}/{retries}): {e}") + if retry_count < retries: + sleep(5) + if not chunk_uploaded: + g_utils.logger.error( + f"Chunk {chunk_idx} failed after {retries} retries, aborting upload") + all_chunks_ok = False + break + if not all_chunks_ok: + chunks_done = chunk_idx - 1 if chunk_idx > 0 else 0 error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, - f"Upload aborted: chunk {chunk_number} failed after {retries} retries") + format_upload_error_body( + _filename, + f"Chunked upload incomplete: {chunks_done}/{num_chunks} " + f"chunks succeeded; failed at chunk {chunk_idx} " + f"after {retries} retries")) error_handler.enqueue_error(error=error) - return None + _emit_failed(f"chunk {chunk_idx} retries exhausted") + # Chunk retries exhausted → Disconnected + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body( + _filename, f"Unhandled exception in chunk loop: {e}")) error_handler.enqueue_error(error=error) - return None + _emit_failed(f"chunk-loop exception: {e}") + # Unhandled exception in chunk loop → Disconnected (safest default) + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) # # End upload session # + # Reactive retry on 401/403 (server authority): refresh only when + # the server explicitly rejects, not on every end-session call. - # Refresh token before end-session to prevent expiry after long chunk uploads - if token_refresher is not None: - g_utils.logger.info("Refreshing token before end-session phase") - refreshed_token = token_refresher() - if refreshed_token is not None: - access_token = refreshed_token - g_utils.logger.info("Token refreshed successfully before end-session") - else: - g_utils.logger.warning("Token refresh returned None, using existing token for end-session") - end_session_url = base_url.rstrip("/") + "/api/device/data/end-session" end_session_payload = file_json['end_session'] end_session_payload['sessionId'] = session_id end_session_payload['completedAt'] = int(datetime.now(timezone.utc).timestamp()*1000) - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - try: - end_session_payload = json.dumps(end_session_payload) - g_utils.logger.info(f"Ending upload session (sessionId={session_id})") - g_utils.logger.debug(f"Device log upload payload (end-session): {end_session_payload}") - response = requests.post(end_session_url, - headers=headers, - data=end_session_payload, - timeout=(30, 60)) + end_session_payload = json.dumps(end_session_payload) + g_utils.logger.info(f"Ending upload session (sessionId={session_id})") + g_utils.logger.debug(f"Device log upload payload (end-session): {end_session_payload}") - if response.status_code == CONFLICT: - device_file_name = str(file_json['start_session']['metadata']['deviceFileName']) - g_utils.logger.info(f"File {device_file_name} rejected as duplicate (409 Conflict).") - return {"accepted": False, "filename": device_file_name, "reason_code": LogUploadReasonCode.DUPLICATE.value} + response = None + for attempt in (1, 2): + try: + response = requests.post( + end_session_url, + headers=_build_dcs_headers(access_token, correlation_id, device_sn), + data=end_session_payload, + timeout=(30, 60)) + g_utils.logger.info(f"End-session response: {response.status_code}") + except Exception as e: + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body(_filename, f"End-session failed: {e}"))) + _emit_failed(f"end-session connection error: {e}") + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) - g_utils.logger.info(f"End-session response: {response.status_code}") - if response.status_code != 200: - _log_dcs_trace_id(response, correlation_id) - g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") - raise Exception(f"Error while ending upload session: {response.status_code} - {response.text}") + if _should_retry_with_refresh(response) and attempt == 1 and token_refresher is not None: + refreshed = token_refresher() + if refreshed is None: + break + access_token = refreshed + continue + break - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + if response.status_code == CONFLICT: + g_utils.logger.info(f"File {_filename} rejected as duplicate (409 Conflict).") + _emit_failed("end-session 409 duplicate") + return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) + + if response.status_code != 200: + _log_dcs_trace_id(response, correlation_id) + g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") + reason = _classify_http_reason(response.status_code) + # Include response body (truncated) so the operator can tell a + # 500-from-DCS-bug apart from a 500-from-bad-payload from a + # single Error 930 line. + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body( + _filename, + f"End-session failed: {response.status_code} - {response.text[:500]}")) error_handler.enqueue_error(error=error) - return None + _emit_failed(f"end-session {response.status_code}") + return _upload_rejection(_filename, reason) g_utils.logger.info(f"File {file_json['start_session']['metadata']['deviceFileName']} uploaded.") + try: + _summary["sha256"] = str(file_json['end_session'].get('checksum', ''))[:16] + except (AttributeError, TypeError): + _summary["sha256"] = "" + _emit_complete() return str(file_json['start_session']['metadata']['deviceFileName']) Index: cloudsync/handlers/ui_cs_request_handler.py =================================================================== diff -u -ra19403bb936dd50d786e22fd52105f586a93f252 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision a19403bb936dd50d786e22fd52105f586a93f252) +++ cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -1,10 +1,11 @@ from logging import Logger -from threading import Event, Thread +from threading import Event, Lock, Thread from collections import deque from cloudsync.handlers.uics_message import UICSMessage from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error +from cloudsync.handlers.outgoing.handler_cs_to_dcs import format_upload_error_body from cloudsync.common.enums import * from cloudsync.utils.globals import * from cloudsync.utils.helpers import * @@ -14,37 +15,136 @@ import os import shutil import time +# Explicit import: cloudsync.utils.helpers does ``from time import time``, +# which would shadow the ``time`` module reference for any subsequent +# ``time.`` access via the star-import above. Importing the +# specific binding by name avoids the shadow. +from time import monotonic +# In-flight 1010 dedup. Set holds device-log paths currently being +# processed inside the UI2CS_UPLOAD_DEVICE_LOG branch (SHA check + +# enqueue). Parallel 1010s for the same path arriving while one is in +# flight are dropped silently with an INFO log -- prevents the storm +# pattern where a UI-side queue retries every 5 s, each retry kicking +# off a redundant SHA computation on a multi-MB log file. +_in_flight_uploads: set = set() +_in_flight_lock = Lock() + + +# Persistent SHA-mismatch give-up tracker. When repeated SHA-mismatch +# failures for the same (path, ui_sha) cross the threshold, CS-side +# suppress drops subsequent 1010s for that key silently for the +# suppress duration -- closes the storm regardless of UI behavior. +# A one-shot wire `2010,3,,0,` is also emitted +# at threshold cross for forward-compat with future UI versions that +# honor the new reason code; current UI ignores unknown codes and keeps +# retrying, but Layer-1 suppress drops every retry so the customer- +# visible CS log stays quiet. +# +# Sliding window uses ``monotonic()`` (NOT wall-clock): the device test +# protocols change system time to simulate midnight cycles, and a +# wall-clock window would corrupt the failure counter under those +# protocols. +# +# Per-(path, ui_sha) granularity: a different SHA for the same path is +# a legitimately new upload attempt and is not suppressed. +_GIVEUP_THRESHOLD = 5 +_GIVEUP_WINDOW_S = 60.0 +_GIVEUP_SUPPRESS_S = 3600.0 +_giveup_failures: dict = {} # (path, ui_sha) -> list[monotonic timestamps] +_giveup_until: dict = {} # (path, ui_sha) -> monotonic deadline +_giveup_lock = Lock() + + +def _giveup_is_suppressed(key) -> bool: + """Return True iff ``key`` is currently in the give-up suppress + window. Lazy-expires the entry on read when the deadline has + passed so no separate cleanup thread is needed.""" + with _giveup_lock: + deadline = _giveup_until.get(key) + if deadline is None: + return False + if monotonic() >= deadline: + _giveup_until.pop(key, None) + return False + return True + + +def _giveup_record_sha_failure(key) -> bool: + """Record a SHA-mismatch failure for ``key`` and return True iff + this failure is the one that crosses the give-up threshold. + + Lazy-prunes timestamps older than the sliding window. On + threshold cross, enters the suppress window and clears the + failure list so a fresh threshold cross can occur after the + suppress entry expires (i.e. the problem persists past 1 hour).""" + now = monotonic() + cutoff = now - _GIVEUP_WINDOW_S + with _giveup_lock: + # If already suppressed (race against a Layer-1 check), do + # not re-trigger; the caller should have silent-dropped. + deadline = _giveup_until.get(key) + if deadline is not None and now < deadline: + return False + bucket = _giveup_failures.setdefault(key, []) + bucket[:] = [t for t in bucket if t >= cutoff] + bucket.append(now) + if len(bucket) >= _GIVEUP_THRESHOLD: + _giveup_until[key] = now + _GIVEUP_SUPPRESS_S + _giveup_failures.pop(key, None) + return True + return False + + class UICSMessageHandler: def __init__(self, logger: Logger, max_size, network_request_handler, output_channel, reachability_provider, - error_handler): + error_handler, idempotent_network_request_handler=None): self.logger = logger self.reachability_provider = reachability_provider self.network_request_handler = network_request_handler + # Idempotent lane for long-running filename-keyed uploads + # (SEND_DEVICE_LOG). Back-compat default: when omitted, fall back + # to the state lane so existing tests and older wiring still function. + self.idempotent_network_request_handler = ( + idempotent_network_request_handler or network_request_handler) self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe + # Progress marker; updated after each UI-CS message is handled. + self.last_progress_ts = 0 self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() self.logger.info('Created UI_CS Handler') + # Idle-tick cadence for the scheduler. An idle message handler must + # still register as alive to the watchdog. Well below max_idle_s=120. + IDLE_TICK_S = 60 + def scheduler(self) -> None: """ Continuously monitors the event flag to check for new messages :return: None """ while True: - flag = self.event.wait() + # Top-of-loop tick in monotonic units so wall-clock jumps do + # not trip the watchdog, and an idle queue still counts as + # healthy. + self.last_progress_ts = time.monotonic() + flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: while len(self.queue) > 0: message = self.queue.popleft() try: self.handle_message(message) finally: + # Mark progress after each message is fully processed. + # Monotonic clock so a wall-clock step does not poison + # the marker. + self.last_progress_ts = time.monotonic() clear_correlation_id() self.event.clear() @@ -115,15 +215,23 @@ helpers_write_config(None, CONFIG_PATH, message.g_config) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) - helpers_add_to_network_queue(network_request_handler=self.network_request_handler, - request_type=NetworkRequestType.CS2MFT_REQ_REGISTRATION, - url='', - payload={}, - method='', - g_config=message.g_config, - success_message='CS2MFT_REQ_REGISTRATION request added to network ' - 'queue', - correlation_id=message.correlation_id) + # REGISTRATION is a one-time provisioning flow; on + # backpressure there is no useful retry strategy — + # surface via warning and let the manufacturing tool retry. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2MFT_REQ_REGISTRATION, + url='', + payload={}, + method='', + g_config=message.g_config, + success_message='CS2MFT_REQ_REGISTRATION request added to network ' + 'queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.warning( + "CS2MFT_REQ_REGISTRATION not queued (%s): %s", + outcome, detail) except IOError: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CONFIG_ERROR.value, @@ -160,15 +268,27 @@ try: helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) - helpers_add_to_network_queue(network_request_handler=self.network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE, - url='', - payload={}, - method='', - g_config=message.g_config, - success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network ' - 'queue', - correlation_id=message.correlation_id) + # SET_DEVICE_STATE is state-machine critical (state lane). + # A dropped state transition creates the "Active Not Ready" + # cloud-state blackout — report loss to UI. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE, + url='', + payload={}, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network ' + 'queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.error( + "CS2DCS_REQ_SET_DEVICE_STATE not queued (%s): %s", + outcome, detail) + err = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + "Queue {0}: {1}".format(outcome, detail)) + self._enqueue_error(err, message.correlation_id) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, @@ -189,20 +309,32 @@ treatment_log_json = helpers_read_treatment_log_file(message.parameters[0]) if treatment_log_json: - treatment_log_json['checksum'] = helpers_sha256_checksum(json.dumps(treatment_log_json['data'])) + treatment_log_json['checksum'] = helpers_sha256_string(json.dumps(treatment_log_json['data'])) treatment_log_json['serialNumber'] = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] g_utils.logger.debug("Treatment log {0}".format(treatment_log_json)) - helpers_add_to_network_queue(network_request_handler=self.network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT, - url='', - payload=treatment_log_json, - method='', - g_config=message.g_config, - success_message='CS2DCS_REQ_SEND_TREATMENT_REPORT request added to network' - 'queue', - correlation_id=message.correlation_id) + # SEND_TREATMENT_REPORT — treatment-report loss is + # user-visible; surface a CS2UI_ERROR so UI-Brain + # can prompt a retry / mark the report as not synced. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT, + url='', + payload=treatment_log_json, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_TREATMENT_REPORT request added to network' + 'queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.error( + "CS2DCS_REQ_SEND_TREATMENT_REPORT not queued (%s): %s", + outcome, detail) + err = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + "Queue {0}: {1}".format(outcome, detail)) + self._enqueue_error(err, message.correlation_id) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, @@ -283,42 +415,149 @@ self.logger.info("UI2CS_UPLOAD_DEVICE_LOG request received") if (len(message.parameters) != 2) or (message.parameters[0] is None) or (message.parameters[1] is None): + # Best-effort filename: params[0] may be present even if + # the count or params[1] is invalid. + _fn = '' + if message.parameters and len(message.parameters) > 0 and message.parameters[0]: + _fn = os.path.basename(str(message.parameters[0])) error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, - "invalid # of parameters for file upload request") + format_upload_error_body( + _fn, + "invalid # of parameters for file upload request")) self._enqueue_error(error, message.correlation_id) else: - try: + file_path = message.parameters[0] + ui_sha = message.parameters[1] + giveup_key = (file_path, ui_sha) - local_checksum = helpers_sha256_checksum(message.parameters[0]) - if local_checksum != message.parameters[1]: - raise ValueError('No valid sha256 value.') + # Layer-1 give-up suppress. Once a (path, ui_sha) + # has crossed the persistent-failure threshold, + # subsequent 1010s for that key are silently dropped + # for the suppress window: no SHA computation, no + # in-flight slot, no queue, no Error 930, no 2010 + # emit. Closes the storm even if the UI keeps + # retrying (current UI behavior for unknown reason + # codes). + if _giveup_is_suppressed(giveup_key): + self.logger.info( + "Silent-drop UI2CS_UPLOAD_DEVICE_LOG for %s " + "(persistent-failure suppress active)", file_path) + return - hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] - dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] - sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] + with _in_flight_lock: + if file_path in _in_flight_uploads: + self.logger.info( + "Ignoring duplicate UI2CS_UPLOAD_DEVICE_LOG " + "for %s (already in flight)", file_path) + return + _in_flight_uploads.add(file_path) - self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) + try: + try: + local_checksum = helpers_sha256_checksum(file_path) + if local_checksum != message.parameters[1]: + # Persistent-failure tracker: record the + # SHA-mismatch and, on the threshold-cross + # attempt, emit a one-shot wire signal + + # one diagnostic Error 930 distinct from + # the per-attempt Error 930 raised below. + # Subsequent 1010s for the same (path, + # ui_sha) are silent-dropped at the top of + # this handler for the suppress duration. + if _giveup_record_sha_failure(giveup_key): + device_file_name = os.path.basename(file_path) + wire_msg = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + device_file_name, + LogUploadReasonCode.VALIDATION_GIVEUP.value) + self.output_channel.enqueue_message(wire_msg) + giveup_err = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + format_upload_error_body( + device_file_name, + "Persistent SHA-mismatch give-up: " + "{0} failures within {1}s; " + "suppressing further uploads for {2}s".format( + _GIVEUP_THRESHOLD, + int(_GIVEUP_WINDOW_S), + int(_GIVEUP_SUPPRESS_S)))) + self._enqueue_error(giveup_err, message.correlation_id) + # One log line tells the operator (a) which + # file, (b) what the UI claimed the SHA was, + # (c) what CS computed. Replaces the opaque + # 'No valid sha256 value.' that collapsed + # file-missing, SHA-mismatch, and content-drift + # into one indistinguishable error. + raise ValueError( + "SHA256 mismatch for {0}: UI={1}..., CS-computed={2}...".format( + file_path, + message.parameters[1][:16], + local_checksum[:16], + ) + ) - device_log_data = { - "path": message.parameters[0], - "serialNumber": message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], - "checksum": local_checksum - } + hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] + sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] - g_utils.logger.debug("Device log data {0}".format(device_log_data)) + self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) - helpers_add_to_network_queue(network_request_handler=self.network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, - url='', - payload=device_log_data, - method='', - g_config=message.g_config, - success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to network queue', - correlation_id=message.correlation_id) + device_log_data = { + "path": file_path, + "serialNumber": message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + "checksum": local_checksum + } - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_LOG_ERROR.value, - str(e)) - self._enqueue_error(error, message.correlation_id) + g_utils.logger.debug("Device log data {0}".format(device_log_data)) + + # Route to the idempotent lane so large log uploads + # (up to ~350 MB, 10–30 min) do not starve the state + # lane. Filename is the dedup key; per-request ordering + # between logs is not required. + # SEND_DEVICE_LOG queue saturation is an upstream + # terminal failure. Emit the protocol-shaped + # 2010,3,,0,1 (Disconnected) rejection so UI-Brain's + # backup-and-retry handler engages, AND CS2UI_ERROR for + # operator visibility. Both are expected (layered). + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.idempotent_network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, + url='', + payload=device_log_data, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to idempotent queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.error( + "CS2DCS_REQ_SEND_DEVICE_LOG not queued (%s): %s", + outcome, detail) + # Protocol-shaped rejection + device_file_name = os.path.basename( + device_log_data.get("path", "")) + rejection_body = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + device_file_name, + LogUploadReasonCode.DISCONNECTED.value) + self.output_channel.enqueue_message(rejection_body) + # CS2UI_ERROR for operator diagnostics (layered) + err = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + format_upload_error_body( + device_file_name, + "Queue {0}: {1}".format(outcome, detail))) + self._enqueue_error(err, message.correlation_id) + + except Exception as e: + # Best-effort filename: params[0] may be a valid path + # even if the upload failed downstream. + _fn = os.path.basename(str(file_path)) if file_path else '' + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + format_upload_error_body(_fn, str(e))) + self._enqueue_error(error, message.correlation_id) + finally: + with _in_flight_lock: + _in_flight_uploads.discard(file_path) Index: cloudsync/utils/alive.py =================================================================== diff -u --- cloudsync/utils/alive.py (revision 0) +++ cloudsync/utils/alive.py (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -0,0 +1,59 @@ +"""Scheduler-independent CS-alive file provider. + +A tiny dedicated daemon thread writes the current unix timestamp to a file +on the SD card every ``interval`` seconds. No queue, no scheduler, no output +bus — this path stays functional even when the CS output pipeline is stuck, +giving UI-Brain (and the watchdog) a liveness signal that is not +susceptible to ``network_request_handler`` or ``output_channel`` +starvation. + +The ``last_progress_ts`` attribute is updated after every successful file +write and is consumed by the liveness-progress watchdog. If the write loop +itself stalls (e.g. SD-card hang), ``last_progress_ts`` stops advancing +and the watchdog escalates to sentinel. This is self-monitoring by design. +""" + +from threading import Thread +from time import monotonic, sleep, time + +from cloudsync.utils.globals import CS_ALIVE_FILE + + +class AliveProvider: + """Writes a unix-timestamp heartbeat to ``CS_ALIVE_FILE`` on a dedicated thread.""" + + DEFAULT_INTERVAL_S = 5 + + def __init__(self, logger, path=CS_ALIVE_FILE, interval=DEFAULT_INTERVAL_S): + """Create the provider and start its background thread immediately. + + :param logger: Logger for transient write failures. + :param str path: Destination file (default :data:`CS_ALIVE_FILE`). + :param int interval: Seconds between writes (default 5). + """ + self._logger = logger + self._path = path + self._interval = interval + self.last_progress_ts = 0 + self.thread = Thread(target=self._loop, daemon=True) + self.thread.start() + + def _loop(self): + """Write ``\\n`` to :attr:`_path` on every tick. + + Tolerates ``OSError``/``IOError`` so a transient SD-card failure + cannot kill the thread. Persistent failure is caught by the + watchdog via staleness of :attr:`last_progress_ts`. + """ + while True: + try: + with open(self._path, "w") as f: + f.write("{}\n".format(int(time()))) + # Monotonic clock for the watchdog marker; wall-clock stays in + # the file payload (UI-Brain reads it as a human-readable time). + self.last_progress_ts = monotonic() + except (OSError, IOError) as exc: + if self._logger is not None: + self._logger.warning( + "AliveProvider write to %s failed: %s", self._path, exc) + sleep(self._interval) Index: cloudsync/utils/globals.py =================================================================== diff -u -r8fe9c82ff8cd968ebdca518f35c3264a25a02927 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/utils/globals.py (.../globals.py) (revision 8fe9c82ff8cd968ebdca518f35c3264a25a02927) +++ cloudsync/utils/globals.py (.../globals.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -1,4 +1,5 @@ """Object holding all application global constants""" +import hashlib import os # PATHS @@ -61,6 +62,10 @@ CS_LOG_PATH = "/media/sd-card/cloudsync/log" CS_LOG_FILE = os.path.join(CS_LOG_PATH, "cloudsync.log") +# LIVENESS: scheduler-independent progress file consumed by UI-Brain +# (and by the watchdog as a self-monitoring progress signal). +CS_ALIVE_FILE = "/media/sd-card/cloudsync/cs_alive" + # DEVICE TOKEN TOKEN_CACHING_PATH = "/var/configurations/CloudSync/jwt/" DEVICE_KEBORMED_ACCESS_TOKEN_PATH = os.path.join(TOKEN_CACHING_PATH, "access_token.json") @@ -167,3 +172,25 @@ # TIME CONSTANTS S_MS_CONVERSION_FACTOR = 1000 + +# Fleet-stagger jitter. Per-device deterministic jitter in +# [0, JITTER_MAX_SECONDS) derived from the HD serial. Applied to +# time-scheduled CS->DCS events to break the synchronous fleet-wide +# cliff at 00:00 UTC (rotation, pre-warm, first reachability probe). +# 2 minutes is enough to flatten the DCS ingress spike without +# affecting any timing-sensitive flow at this scale. +JITTER_MAX_SECONDS = 120 + + +def get_device_jitter_seconds(serial_number): + """Deterministic per-device jitter in [0, JITTER_MAX_SECONDS). + + Same serial always produces the same value. A falsy / missing serial + (e.g. pre-registration manufacturing mode) returns 0 so the call site + never crashes; registration is not a fleet-wide concern so no stagger + is needed in that path. + """ + if not serial_number: + return 0 + digest = hashlib.sha256(str(serial_number).encode("utf-8")).hexdigest() + return int(digest, 16) % JITTER_MAX_SECONDS Index: cloudsync/utils/heartbeat.py =================================================================== diff -u -r8fe9c82ff8cd968ebdca518f35c3264a25a02927 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/utils/heartbeat.py (.../heartbeat.py) (revision 8fe9c82ff8cd968ebdca518f35c3264a25a02927) +++ cloudsync/utils/heartbeat.py (.../heartbeat.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -1,20 +1,32 @@ """Implementation of heartbeat functionality""" +import glob + from logging import Logger from threading import Thread -from time import sleep +from time import monotonic, sleep, time from cloudsync.utils.helpers import * from cloudsync.common.enums import * + class HeartBeatProvider: HEARTBEAT_FREQ = 20 send_heartbeat = False - def __init__(self, logger: Logger, network_request_handler, output_channel): + def __init__(self, logger: Logger, network_request_handler, output_channel, + idempotent_network_request_handler=None, g_config=None, + cs_log_path=None): self.logger = logger self.network_request_handler = network_request_handler self.output_channel = output_channel + # Opportunistic CS-log upload deps. When any is None the upload + # pathway is inert and the heartbeat behaves like the 0.5.4 baseline. + self._idempotent_handler = idempotent_network_request_handler + self._g_config = g_config + self._cs_log_path = cs_log_path or CS_LOG_PATH + # Progress marker; updated at the end of every heartbeat cycle. + self.last_progress_ts = 0 self.thread = Thread(target=self.heartbeat, daemon=True) self.thread.start() @@ -24,9 +36,108 @@ """ while True: if self.send_heartbeat: - # requesting device state from UI - helpers_add_to_output_channel(output_channel=self.output_channel, - message_body=str(OutboundMessageIDs.CS2UI_REQ_DEVICE_STATE.value) + ',0', - success_message="CS2UI_REQ_DEVICE_STATE message added to output channel") + # Drop-oldest semantics: recent state is more useful than a + # stale queued copy, so a "full" outcome is acceptable + # (logged at debug; the next tick will try again). + outcome, detail = helpers_add_to_output_channel( + output_channel=self.output_channel, + message_body=str(OutboundMessageIDs.CS2UI_REQ_DEVICE_STATE.value) + ',0', + success_message="CS2UI_REQ_DEVICE_STATE message added to output channel") + if outcome != "queued": + self.logger.debug( + "Heartbeat CS2UI_REQ_DEVICE_STATE dropped (%s): %s", + outcome, detail) + # Opportunistic CS-log upload. Decouples midnight rotation + # from upload: one file per heartbeat cycle when the idempotent + # lane is idle. A 100-file backlog drains in ~33 min at 20 s/tick. + self._maybe_upload_pending_cs_log() + + # Tick the progress marker regardless of send_heartbeat: a + # running-but-suppressed heartbeat thread is still "making + # progress" for watchdog purposes. Monotonic clock so a + # wall-clock step does not poison the marker. + self.last_progress_ts = monotonic() sleep(self.HEARTBEAT_FREQ) + + def _maybe_upload_pending_cs_log(self): + """Enqueue the oldest rotated CS log file when conditions allow. + + Guards (ordered cheapest-first): + 1. Upload deps wired (idempotent handler + g_config present). + 2. Idempotent lane idle (empty queue). + 3. Reachability reports online. + 4. A pending file exists on disk. + + Any guard-miss is a silent no-op — the next tick retries. + """ + if self._idempotent_handler is None or self._g_config is None: + return + if not self._idempotent_queue_idle(): + return + if not g_utils.reachability_provider.reachability: + return + pending = self._get_oldest_pending_cs_log() + if pending is None: + return + try: + serial = self._g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + except Exception as exc: + self.logger.warning("CS-log upload: serial lookup failed: %s", exc) + return + try: + checksum = helpers_sha256_checksum(pending) + except Exception as exc: + self.logger.warning("CS-log upload: checksum failed for %s: %s", pending, exc) + return + cs_log_data = { + "path": pending, + "serialNumber": serial, + "checksum": checksum, + } + correlation_id = "cs-{0}-cslog-0-{1}".format(serial, str(round(time() * 1000))) + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self._idempotent_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG, + url='', + payload=cs_log_data, + method='', + g_config=self._g_config, + correlation_id=correlation_id, + success_message="Opportunistic CS log upload queued for {0}".format(pending)) + if outcome != "queued": + self.logger.debug("CS log upload deferred (%s): %s", outcome, detail) + + def _idempotent_queue_idle(self): + """True when the idempotent lane has no pending items. + + Keeps opportunistic CS-log uploads from stacking on top of user-triggered + device-log uploads (SEND_DEVICE_LOG). Returns False defensively on + any attribute error. + """ + try: + return len(self._idempotent_handler.queue) == 0 + except Exception: + return False + + def _get_oldest_pending_cs_log(self): + """Return the oldest rotated CS-log filename, or None. + + Sort by mtime rather than filename because the rotation suffix is + ``%m-%d-%Y`` (MM-DD-YYYY), which does NOT sort chronologically across + months or years alphabetically. mtime is authoritative. + """ + pattern = glob.escape(self._cs_log_path).rstrip('/') + '/cloudsync.log.*' + try: + matches = glob.glob(pattern) + except Exception as exc: + self.logger.warning("CS-log upload: glob failed for %s: %s", pattern, exc) + return None + if not matches: + return None + try: + matches.sort(key=lambda p: os.path.getmtime(p)) + except Exception as exc: + self.logger.warning("CS-log upload: mtime sort failed: %s", exc) + return None + return matches[0] Index: cloudsync/utils/helpers.py =================================================================== diff -u -r7ef7642362334bb325692b852cb10387d25b9b5c -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/utils/helpers.py (.../helpers.py) (revision 7ef7642362334bb325692b852cb10387d25b9b5c) +++ cloudsync/utils/helpers.py (.../helpers.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -10,6 +10,7 @@ import uuid import subprocess import math +import threading from datetime import * from time import time, sleep @@ -182,18 +183,29 @@ def helpers_get_stored_token() -> Union[str, None]: + """Return the stored access token string, or None. + + No client-side `exp` check. DCS's JwtBearer middleware is the sole + authority on token lifetime (default 5 min ClockSkew tolerance). On + 401/403 from a DCS call, callers refresh via cert auth and retry + once. This eliminates the wall-clock dependency that causes refresh + storms on devices with operator-set or jumped system time. + + Corruption guard: a corrupt / truncated / unreadable token file MUST + return None rather than crash CS. + + :return: The access token string if present, otherwise None. """ - Returns the stored token - :return: The token if found, otherwise returns None - """ - data = None if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): return None - with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: - try: + try: + with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: data = json.load(f) - except json.decoder.JSONDecodeError: - return None + except (json.JSONDecodeError, IOError, OSError, UnicodeDecodeError) as e: + if g_utils.logger is not None: + g_utils.logger.warning( + "Stored access-token file is corrupt or unreadable; forcing refresh: %s", e) + return None if data is None: return None @@ -236,14 +248,27 @@ def helpers_read_access_token(path: str) -> dict: """ - Reads the access token json file, returns it as a dict + Reads the access token json file, returns it as a dict. + + On any read failure (corrupt JSON, IO error, invalid encoding) return + an empty dict instead of raising. An unreadable token file must not + crash CS — the caller will treat {} the same as "no cached token" and + obtain a fresh one from Keycloak. + :param path: The path to the access token json file - :return: + :return: parsed dict, or {} if file missing / unreadable / corrupt """ data = {} - if os.path.exists(path): + if not os.path.exists(path): + return data + try: with open(path, 'r', encoding="utf-8-sig") as f: data = json.load(f) + except (json.JSONDecodeError, IOError, OSError, UnicodeDecodeError) as e: + if g_utils.logger is not None: + g_utils.logger.warning( + "Access-token file at %s is corrupt or unreadable: %s", path, e) + return {} return data @@ -470,78 +495,207 @@ return None +# Three-state backpressure. +# These helpers USED to busy-wait 30 s and silently swallow the failure on +# queue saturation. Now they return a typed outcome so every caller can +# decide the right remediation (CS2UI_ERROR, 2010 rejection, retry-later…). +# +# Outcome contract: +# ("queued", None) — successfully enqueued +# ("full", "") — queue saturated after max_wait_s +# ("failed", "") — pre-condition failed (no network, +# unexpected exception, etc.) +# +# Default max_wait_s is 5 s (was 30 s implicitly via 300 × 0.1 s). The +# handler thread is released sooner so upstream back-pressure propagates. + +_F4_MAX_WAIT_S_DEFAULT = 5 +_F4_POLL_INTERVAL_S = 0.1 + +# Rolling counter of "full" events used by the watchdog to escalate +# to sentinel when queue saturation is chronic rather than transient. +_F4_QUEUE_FULL_EVENTS = [] +_F4_QUEUE_FULL_WINDOW_S = 300 +_F4_QUEUE_FULL_LOCK = threading.Lock() + + +def _f4_record_queue_full(): + """Record a 'queue saturated' event for watchdog consumption.""" + now = time() + with _F4_QUEUE_FULL_LOCK: + _F4_QUEUE_FULL_EVENTS.append(now) + # Trim events older than the window. + cutoff = now - _F4_QUEUE_FULL_WINDOW_S + while _F4_QUEUE_FULL_EVENTS and _F4_QUEUE_FULL_EVENTS[0] < cutoff: + _F4_QUEUE_FULL_EVENTS.pop(0) + + +def helpers_queue_full_event_count(window_s=_F4_QUEUE_FULL_WINDOW_S): + """Return the number of queue-saturation events in the last *window_s* seconds. + + Consumed by the watchdog to escalate chronic saturation to sentinel + independently of any individual thread appearing dead. + """ + now = time() + cutoff = now - window_s + with _F4_QUEUE_FULL_LOCK: + while _F4_QUEUE_FULL_EVENTS and _F4_QUEUE_FULL_EVENTS[0] < cutoff: + _F4_QUEUE_FULL_EVENTS.pop(0) + return len(_F4_QUEUE_FULL_EVENTS) + + def helpers_add_to_network_queue(network_request_handler, request_type, url, payload, method, g_config, - success_message, correlation_id=""): + success_message, correlation_id="", max_wait_s=_F4_MAX_WAIT_S_DEFAULT): + """Enqueue a network request with explicit backpressure reporting. + + :return: ``("queued", None)`` on success, ``("full", reason)`` when + the queue is saturated for the full ``max_wait_s`` window, or + ``("failed", reason)`` when a pre-condition blocks enqueue + (no internet, unexpected exception). + + Callers MUST capture the return value and act on non-``queued`` + outcomes — surface a CS2UI_ERROR (or upstream-visible signal) on + "full"/"failed" so the caller can observe and retry. + """ + # Reachability pre-check. Retain the same cycle model (fast ramp-up + # during early boot when reachability probe hasn't fired yet) but + # surface the definitive "no internet" outcome to the caller. cycle_duration = REACHABILITY_CYCLE_PAUSE total_cycles = REACHABILITY_CYCLES - cycle = 0 - while (not g_utils.reachability_provider.reachability) and (cycle < total_cycles): sleep(cycle_duration) cycle += 1 - if g_utils.reachability_provider.reachability: + if not g_utils.reachability_provider.reachability: + reason = "Internet DOWN: Network request {0} couldn't be processed".format(request_type.name) + g_utils.logger.warning(reason) + return ("failed", reason) + + try: r = NetworkRequest(request_type=request_type, url=url, payload=payload, method=method, g_config=g_config, correlation_id=correlation_id) - request_added_to_queue = False - max_attempts = 300 - attempt = 0 - while not request_added_to_queue and attempt < max_attempts: + except Exception as exc: + reason = "NetworkRequest construction failed for {0}: {1}".format( + request_type.name, exc) + g_utils.logger.error(reason) + return ("failed", reason) + + max_attempts = max(1, int(max_wait_s / _F4_POLL_INTERVAL_S)) + attempt = 0 + request_added_to_queue = False + while not request_added_to_queue and attempt < max_attempts: + try: request_added_to_queue = network_request_handler.enqueue_request(r) - if not request_added_to_queue: - sleep(0.1) - attempt += 1 + except Exception as exc: + reason = "enqueue_request raised for {0}: {1}".format( + request_type.name, exc) + g_utils.logger.error(reason) + return ("failed", reason) + if not request_added_to_queue: + sleep(_F4_POLL_INTERVAL_S) + attempt += 1 - if request_added_to_queue: - g_utils.logger.info(success_message) - else: - g_utils.logger.error("Failed to enqueue network request after {0} attempts: {1}".format(max_attempts, request_type.name)) - else: - g_utils.logger.warning("Internet DOWN: Network request {0} couldn't be processed".format(request_type.name)) + if request_added_to_queue: + g_utils.logger.info(success_message) + return ("queued", None) + reason = "Network queue saturated for {0}s; request {1} not accepted".format( + max_wait_s, request_type.name) + g_utils.logger.error(reason) + _f4_record_queue_full() + return ("full", reason) -def helpers_add_to_output_channel(output_channel, message_body, success_message): - message_added_to_queue = False - max_attempts = 300 + +def helpers_add_to_output_channel(output_channel, message_body, success_message, + max_wait_s=_F4_MAX_WAIT_S_DEFAULT): + """Enqueue an outbound UI-bus message with explicit backpressure reporting. + + :return: ``("queued", None)`` on success, ``("full", reason)`` when + the bus is saturated, or ``("failed", reason)`` on exception. + + Callers MUST capture the return value. For heartbeat/liveness + messages, ``"full"`` is typically acceptable (drop-oldest semantics + — recent state is more useful than a stale queued copy). For + treatment / error messages, callers SHOULD surface the failure. + """ + max_attempts = max(1, int(max_wait_s / _F4_POLL_INTERVAL_S)) attempt = 0 - while not message_added_to_queue and attempt < max_attempts: - message_added_to_queue = output_channel.enqueue_message(message_body) - if not message_added_to_queue: - sleep(0.1) + message_added = False + while not message_added and attempt < max_attempts: + try: + message_added = output_channel.enqueue_message(message_body) + except Exception as exc: + reason = "output_channel.enqueue_message raised: {0}".format(exc) + g_utils.logger.error(reason) + return ("failed", reason) + if not message_added: + sleep(_F4_POLL_INTERVAL_S) attempt += 1 - if message_added_to_queue: + if message_added: g_utils.logger.info(success_message) - else: - g_utils.logger.error("Failed to enqueue output message after {0} attempts".format(max_attempts)) + return ("queued", None) + reason = "Output channel saturated for {0}s; message not accepted".format(max_wait_s) + g_utils.logger.error(reason) + _f4_record_queue_full() + return ("full", reason) -def helpers_sha256_checksum(data: str) -> str: + +def helpers_sha256_checksum(file_path: str) -> str: """ - Returns the calculated checksum (SHA256) for input data - @param data: input data. It can be either string or file - @return:: checksum - """ + Returns the SHA256 of the file's raw bytes. - isFile = os.path.isfile(data) + Strict path-only contract. Callers that need to hash a literal + string must use ``helpers_sha256_string`` explicitly. - if isFile: - checksum = hashlib.sha256() - with open(data, "rb") as f: - # Read and update hash in blocks of 4K - for byte_block in iter(lambda: f.read(4096), b""): - checksum.update(byte_block) - else: - checksum = hashlib.sha256(data.encode()) + @param file_path: path to an existing regular file + @return:: hex-digest SHA256 of file bytes + @raises TypeError: if file_path is not a str + @raises FileNotFoundError: if file_path does not resolve to a file + (operationally distinguishes file-missing from SHA mismatch + in the upload pipeline) + """ + if not isinstance(file_path, str): + raise TypeError( + "helpers_sha256_checksum expects str, got {0}".format(type(file_path).__name__) + ) + if not os.path.isfile(file_path): + raise FileNotFoundError(file_path) + checksum = hashlib.sha256() + with open(file_path, "rb") as f: + # Read and update hash in blocks of 4K + for byte_block in iter(lambda: f.read(4096), b""): + checksum.update(byte_block) return checksum.hexdigest() +def helpers_sha256_string(data: str) -> str: + """ + Returns the SHA256 of a literal string (UTF-8-encoded). + + Companion to ``helpers_sha256_checksum``: that function is strict + path-only. Use this one for protocol-payload integrity (e.g. + treatment-report JSON) where the input is intentionally a string, + not a file path. + + @param data: the string to hash + @return:: hex-digest SHA256 of data.encode('utf-8') + @raises TypeError: if data is not a str + """ + if not isinstance(data, str): + raise TypeError( + "helpers_sha256_string expects str, got {0}".format(type(data).__name__) + ) + return hashlib.sha256(data.encode("utf-8")).hexdigest() + + def helpers_crc8(message_list): """ Returns the calculated crc from a message list @@ -690,9 +844,13 @@ """ cs_log_json = helpers_read_cs_log_template(LOG_UPLOAD_TEMPLATE_PATH) - # Convert the file into byte array - logs_byte_array = helpers_file_to_byte_array(cs_log_data['path']) - checksum = helpers_sha256_checksum(logs_byte_array) + # Stream the checksum from disk to avoid loading the whole file + + # base64-encoding it just to produce a byte array to hash. On ~300 MB + # CS logs the previous double-buffer pattern peaked at 4/3 × file size + # and combined with the same pattern inside + # cmd_outgoing_upload_file_in_chunks drove OOM-kill events on the + # 2 GB i.MX8MMini device. + checksum = helpers_sha256_checksum(cs_log_data['path']) # Get file size file_size = helpers_get_file_size(cs_log_data['path']) @@ -753,9 +911,10 @@ """ device_log_json = helpers_read_device_log_template(LOG_UPLOAD_TEMPLATE_PATH) - # Convert the file into byte array - logs_byte_array = helpers_file_to_byte_array(device_log_data['path']) - checksum = helpers_sha256_checksum(logs_byte_array) + # Stream the checksum from disk rather than reading the whole file + + # base64-encoding it to produce a byte array. See + # helpers_construct_cs_log_json for the rationale. + checksum = helpers_sha256_checksum(device_log_data['path']) # Get file size file_size = helpers_get_file_size(device_log_data['path']) Index: cloudsync/utils/reachability.py =================================================================== diff -u -r8fe9c82ff8cd968ebdca518f35c3264a25a02927 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/utils/reachability.py (.../reachability.py) (revision 8fe9c82ff8cd968ebdca518f35c3264a25a02927) +++ cloudsync/utils/reachability.py (.../reachability.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -16,17 +16,31 @@ class ReachabilityProvider: - def __init__(self, logger: Logger, url_reachability): + def __init__(self, logger: Logger, url_reachability, initial_delay_s: int = 0): + """Initialise the reachability provider. + + :param int initial_delay_s: Fleet-stagger delay applied once + before the first probe. Zero preserves 0.5.4 behavior; a + non-zero value prevents the PROD fleet from probing the + reachability URL in lock-step at boot. + """ self.logger = logger self.url_reachability = url_reachability self.reachability = False + self._initial_delay_s = max(0, int(initial_delay_s)) self.thread = Thread(target=self.reachability_test, daemon=True) self.thread.start() def reachability_test(self): """ Continuously monitors the connection to REACHABILITY_URL """ + # Optional first-probe stagger. Gated on > 0 so existing + # tests that patch ``sleep`` to StopIteration still exercise the + # loop body (unchanged 0.5.4 behavior when the provider is + # constructed without a jitter). + if self._initial_delay_s > 0: + sleep(self._initial_delay_s) while True: headers = { 'User-Agent': USER_AGENT, Index: cloudsync/utils/watchdog.py =================================================================== diff -u -ra19403bb936dd50d786e22fd52105f586a93f252 -r8fd0a56808dafcfd65878b3cac8c69a67f62fb2d --- cloudsync/utils/watchdog.py (.../watchdog.py) (revision a19403bb936dd50d786e22fd52105f586a93f252) +++ cloudsync/utils/watchdog.py (.../watchdog.py) (revision 8fd0a56808dafcfd65878b3cac8c69a67f62fb2d) @@ -3,12 +3,31 @@ Monitors registered threads via periodic is_alive() checks, attempts restart on failure (up to a configurable limit), and creates a sentinel file to request full process restart when recovery is exhausted. + +Liveness-progress monitoring (in addition to ``is_alive()`` checks). +``threading.Thread.is_alive()`` returns True for threads blocked in kernel +syscalls, ``event.wait()``, or busy-waits. Stuck-but-alive patterns are +invisible to is_alive() alone. + +The extended API accepts an optional ``progress_fn`` and ``max_idle_s`` per +registration. On each sweep the watchdog treats stale progress the same way +it treats a dead thread: increment the failure counter, attempt restart, and +escalate to sentinel after ``max_restarts``. Require 3 consecutive stale +reads before escalating (the counter resets when the thread becomes live + +fresh again) — this prevents transient-slowness false positives from +creating unnecessary restart storms. + +``register_sentinel_condition`` provides a second escalation path: an +arbitrary zero-arg predicate checked every sweep. If it returns True, the +watchdog immediately creates the sentinel (no restart attempt). Used for +chronic queue-exhaustion (queue-full event counter) where individual +threads stay alive but the pipeline is starved. """ import os from logging import Logger from threading import Event, Thread -from time import sleep +from time import monotonic, sleep # Default sentinel file path — cs.py monitors this to trigger process restart @@ -31,29 +50,56 @@ self._max_restarts = max_restarts self._sentinel_path = sentinel_path self._entries = {} + # Sentinel-condition registry (name → predicate) + self._sentinel_conditions = {} self._stop_event = Event() self._thread = Thread(target=self._monitor, daemon=True) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ - def register(self, name, get_thread, restart_fn): + def register(self, name, get_thread, restart_fn, + progress_fn=None, max_idle_s=None): """Register a thread for monitoring. :param str name: Human-readable name (e.g. "heartbeat"). :param callable get_thread: Zero-arg callable returning the current Thread object (e.g. ``lambda: obj.thread``). :param callable restart_fn: Zero-arg callable that creates and starts - a replacement thread. Use :func:`make_restart_fn` for the common + a replacement thread. Use :func:`make_restart_fn` for the common case of daemon threads backed by a bound method. + :param callable progress_fn: Optional zero-arg callable returning + a monotonic timestamp of the thread's last semantic progress. + If omitted, only dead-thread detection is performed. + :param float max_idle_s: Maximum seconds of staleness tolerated + before the thread counts as failed. Required when + ``progress_fn`` is provided. """ + if progress_fn is not None and max_idle_s is None: + raise ValueError( + "progress_fn requires max_idle_s for '{0}'".format(name)) self._entries[name] = { "get_thread": get_thread, "restart_fn": restart_fn, + "progress_fn": progress_fn, + "max_idle_s": max_idle_s, "failures": 0, } + def register_sentinel_condition(self, name, condition_fn): + """Register a zero-arg predicate that, when True, triggers sentinel. + + Used for conditions where individual threads are alive but the + pipeline as a whole is starved — e.g. chronic queue saturation + (``helpers_queue_full_event_count`` > threshold). + + :param str name: Diagnostic label (appears in sentinel file + logs). + :param callable condition_fn: Zero-arg callable returning a bool. + True → immediate sentinel (no restart attempt). + """ + self._sentinel_conditions[name] = condition_fn + def start(self): """Start the watchdog monitoring loop.""" self._thread.start() @@ -85,20 +131,74 @@ self._stop_event.wait(self._check_interval) def _check_threads(self): - """Single sweep: check every registered thread.""" + """Single sweep: check every registered thread + sentinel condition.""" + # First evaluate sentinel conditions — they short-circuit thread + # checks. A True predicate means the pipeline is chronically + # starved and restarting individual threads will not help. + for name, condition_fn in self._sentinel_conditions.items(): + try: + if condition_fn(): + self._logger.error( + "Watchdog: sentinel condition '%s' fired — " + "escalating directly to sentinel", name) + self._create_sentinel("sentinel_condition:" + name) + return + except Exception as exc: # condition_fn misbehaved; log + skip + self._logger.error( + "Watchdog: sentinel condition '%s' raised: %s", + name, exc) + all_healthy = True for name, entry in self._entries.items(): thread = entry["get_thread"]() - if thread is not None and thread.is_alive(): + is_dead = thread is None or not thread.is_alive() + + # Check progress staleness even when the thread is alive. + is_stale = False + stale_reason = "" + if not is_dead and entry["progress_fn"] is not None: + try: + last_ts = entry["progress_fn"]() + except Exception as exc: + self._logger.error( + "Watchdog: progress_fn for '%s' raised: %s", name, exc) + last_ts = 0 + # last_ts==0 means "no progress recorded yet" — tolerate until + # the thread has had a chance to tick at least once. Use a + # grace equal to max_idle_s from watchdog start. + if last_ts == 0: + is_stale = False + else: + # Compare in monotonic time. A wall-clock step + # (operator `date` command or ntpd correction) must not + # trip the watchdog; only real elapsed idle time matters. + age = monotonic() - last_ts + if age > entry["max_idle_s"]: + is_stale = True + stale_reason = ("progress stale %.1fs > max_idle_s %s" + % (age, entry["max_idle_s"])) + + if not is_dead and not is_stale: + # Reset failure counter on a fully-healthy read. This is + # the "3 consecutive stale reads" rule — any healthy read + # wipes the slate clean and prevents restart-storm escalation + # on transient slowness. + if entry["failures"] > 0: + self._logger.info( + "Watchdog: thread '%s' recovered; " + "resetting failure counter (was %d)", + name, entry["failures"]) + entry["failures"] = 0 continue all_healthy = False entry["failures"] += 1 failures = entry["failures"] + reason = "dead" if is_dead else stale_reason self._logger.error( - "Watchdog: thread '%s' is dead (failure %d/%d)", - name, failures, self._max_restarts, + "Watchdog: thread '%s' is unhealthy (%s) (failure %d/%d)", + name, reason, failures, self._max_restarts, ) if failures <= self._max_restarts: