Index: cloud_sync.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -r79dadd7422855193f345beced530c6b92fc49b3a --- cloud_sync.py (.../cloud_sync.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloud_sync.py (.../cloud_sync.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -25,7 +25,7 @@ import threading -VERSION = "0.5.5" +VERSION = "0.5.6" arguments = sys.argv @@ -91,15 +91,15 @@ g_utils.logger.info(SETUP_CONSOLE_LINE) sys.exit(0) -# F7 - fleet-stagger jitter computed once at boot from the HD serial. +# Fleet-stagger jitter computed once at boot from the HD serial. # Reused by ReachabilityProvider (first-probe delay) and FileOutputBus -# (F6 pre-warm window offset). A pre-registration device with no serial +# (pre-warm window offset). A pre-registration device with no serial # gets jitter=0, which is acceptable: registration happens once per # device and is not a fleet-wide concern. _device_serial = g_config[CONFIG_DEVICE].get(CONFIG_DEVICE_HD_SERIAL) _device_jitter_s = get_device_jitter_seconds(_device_serial) g_utils.logger.info( - "F7 fleet jitter: %ds (serial=%s)", _device_jitter_s, _device_serial) + "Fleet-stagger jitter: %ds (serial=%s)", _device_jitter_s, _device_serial) try: reachability_provider = ReachabilityProvider( @@ -116,15 +116,15 @@ try: g_utils.add_reachability_provider(reachability_provider=reachability_provider) - # F6/F7 - pre-warm thread runs inside FileOutputBus; jitter staggers + # Pre-warm thread runs inside FileOutputBus; jitter staggers # the fleet across a 2-minute window starting at 23:55 UTC. output_channel = FileOutputBus(logger=app.logger, max_size=100, file_channels_path=UI2CS_FILE_CHANNELS_PATH, jitter_seconds=_device_jitter_s) error_handler = ErrorHandler(logger=app.logger, max_size=100, output_channel=output_channel) - # F9 — split-lane architecture. + # Split-lane architecture. # State lane (max_size=1): strict-ordering handler for registration, # SET_DEVICE_STATE and SEND_TREATMENT_REPORT. The single-slot queue is # the explicit ordering invariant — do NOT change max_size on this one. @@ -147,16 +147,16 @@ input_channel_name="inp.buf", g_config=g_config, message_handler=message_handler) - # F5 — heartbeat gets the idempotent lane + g_config so it can drain + # Heartbeat gets the idempotent lane + g_config so it can drain # rotated CS-log files one-per-tick when that lane is idle. Registration - # mode sets send_heartbeat=False below, so the F5 branch stays inert - # during registration. + # mode sets send_heartbeat=False below, so opportunistic CS-log upload + # stays inert during registration. heartbeat_provider = HeartBeatProvider(logger=app.logger, network_request_handler=network_request_handler, output_channel=output_channel, idempotent_network_request_handler=idempotent_network_request_handler, g_config=g_config) - # F9 — CS-log uploads (SEND_CS_LOG) go through the idempotent lane. + # CS-log uploads (SEND_CS_LOG) go through the idempotent lane. logconf.set_network_provider(network_request_handler=idempotent_network_request_handler) logconf.set_error_provider(error_handler=error_handler) logconf.set_configuration(g_config=g_config) @@ -177,11 +177,11 @@ # the registration-to-operation transition. watchdog = Watchdog(logger=app.logger) if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': - # F3: scheduler-independent CS-alive file. Auto-starts on construction. - # The `last_progress_ts` attribute is consumed by F1 (below). + # Scheduler-independent liveness file. Auto-starts on construction. + # The ``last_progress_ts`` attribute is consumed by the watchdog (below). alive_provider = AliveProvider(logger=app.logger) - # F1 — progress-signal contracts (plan §4 F1). max_idle_s values - # are conservative; require 3 consecutive stale reads (default + # Per-thread progress-signal contracts. max_idle_s values are + # conservative; require 3 consecutive stale reads (default # max_restarts) before escalating to sentinel, which gives each # thread an effective 3 × check_interval grace on top of max_idle_s. watchdog.register("alive", lambda: alive_provider.thread, @@ -200,8 +200,8 @@ make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler), progress_fn=lambda: network_request_handler.last_progress_ts, max_idle_s=300) - # F9/F1 — idempotent lane worker: large-file uploads legitimately - # take 10–30 min, so max_idle_s is 1800 s (30 min) per plan §4 F1. + # Idempotent lane worker: large-file uploads legitimately take + # 10–30 min, so max_idle_s is 1800 s (30 min). watchdog.register("idempotent_request_handler", lambda: idempotent_network_request_handler.thread, make_restart_fn(idempotent_network_request_handler, "thread", @@ -220,8 +220,8 @@ make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat), progress_fn=lambda: heartbeat_provider.last_progress_ts, max_idle_s=90) - # F1 — chronic queue-saturation escalation. If the F4 counter shows - # more than 3 "queue full" events in the last 5 min, individual + # Chronic queue-saturation escalation. If the queue-full counter + # shows more than 3 "queue full" events in the last 5 min, individual # threads are alive but the pipeline is starved — skip restarts and # go straight to sentinel for a clean-slate process restart. watchdog.register_sentinel_condition( @@ -305,8 +305,8 @@ return {"invalidAttributes": invalid_params}, BAD_REQUEST try: - # F4 caller audit: manufacturing-tool credentials flow. Surface - # queue failures to the existing CS2UI_ERROR channel so the MFT + # Manufacturing-tool credentials flow. Surface queue + # failures to the existing CS2UI_ERROR channel so the MFT # can observe and retry. outcome, detail = helpers_add_to_network_queue( network_request_handler=network_request_handler, Index: cloudsync/busses/file_input_bus.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -30,7 +30,7 @@ """ self.last_input_message_id = 0 - # F1 — progress marker; updated after each inotify event is processed. + # Progress marker; updated after each inotify event is processed. self.last_progress_ts = 0 self.logger = logger @@ -51,9 +51,9 @@ The Input Channel Handler - it parses and sends upstream all messages arriving on the File Input Bus """ for event in self.i.event_gen(yield_nones=False): - # F1 — every inotify event (including filtered-out ones) counts as + # Every inotify event (including filtered-out ones) counts as # progress; it proves the thread is reading events off the kernel. - # F13 — monotonic (clock-jump robust). + # Monotonic clock so a wall-clock step does not poison the marker. self.last_progress_ts = monotonic() (_, type_names, path, filename) = event # self.logger.debug("PATH=[{}] FILENAME=[{}] EVENT_TYPES={}".format(path, filename, type_names)) Index: cloudsync/busses/file_output_bus.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -14,16 +14,16 @@ class FileOutputBus: """File Output Bus class that receives, parses and sends downstream all outbound messages""" - # F6 — pre-warm window and cadence. The window is 5 minutes wide so - # the 0-120 s F7 jitter fits inside with a 3-minute buffer; the check + # Pre-warm window and cadence. The window is 5 minutes wide so the + # 0-120 s fleet jitter fits inside with a 3-minute buffer; the check # runs once per minute which is cheap and well under the window size. PREWARM_CHECK_INTERVAL_S = 60 PREWARM_WINDOW_START_HOUR = 23 PREWARM_WINDOW_START_MINUTE = 55 - # F13 — idle-tick cadence for the scheduler. An idle output bus must - # still register as alive to the watchdog, so wake up at least this - # often and refresh last_progress_ts. Well below max_idle_s=120. + # Idle-tick cadence for the scheduler. An idle output bus must still + # register as alive to the watchdog, so wake up at least this often + # and refresh last_progress_ts. Well below max_idle_s=120. IDLE_TICK_S = 60 def __init__(self, @@ -38,8 +38,8 @@ :param max_size: the maximum size of the message queue for the Output Bus :type max_size: int :param str file_channels_path: the path where the bus files are located - :param int jitter_seconds: F7 per-device jitter in [0, 120) used by - the F6 pre-warm loop to stagger the fleet across a 2-minute + :param int jitter_seconds: Per-device jitter in [0, 120) used by + the pre-warm loop to stagger the fleet across a 2-minute window inside the 5-minute pre-warm window. 0 means "fire at the window start" (default; registration mode has no serial). """ @@ -49,13 +49,13 @@ self.logger = logger self.file_channels_path = file_channels_path self.queue = deque(maxlen=max_size) - # F1 - progress marker; updated after every successful out.buf write. + # Progress marker; updated after every successful out.buf write. self.last_progress_ts = 0 self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() - # F6 - pre-warm state. `_last_prewarm_date` is the UTC date on which + # Pre-warm state. `_last_prewarm_date` is the UTC date on which # we successfully touched tomorrow's file; prevents double-firing # inside the 5-minute window. Failures intentionally do NOT update # it so the loop retries next tick. @@ -70,12 +70,11 @@ :return: None """ while True: - # F13 — tick progress at the top of every loop iteration. This - # makes an idle output bus visible to the watchdog (previously, + # Tick progress at the top of every loop iteration. This makes + # an idle output bus visible to the watchdog (previously, # ``last_progress_ts`` only advanced after a successful write, # so a quiet period longer than max_idle_s=120s falsely tripped - # staleness). Idle-tick uses monotonic, immune to wall-clock - # jumps. + # staleness). Monotonic clock, immune to wall-clock jumps. self.last_progress_ts = monotonic() flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: @@ -121,15 +120,15 @@ with open(self.file_channels_path + "/" + filename, "a") as f: f.write("{0}\n".format(message_body)) self.last_output_message_id += 1 - # F1 - mark progress only after a successful write (failed - # writes don't constitute forward progress). - # F13 - monotonic (clock-jump robust). + # Mark progress only after a successful write (failed writes + # don't constitute forward progress). Monotonic clock so a + # wall-clock step does not poison the marker. self.last_progress_ts = monotonic() except IOError as er: self.logger.error('Opening and/or writing to output file error: {0}'.format(str(er))) def _prewarm_loop(self): - """F6 - independent timer loop that calls ``_maybe_prewarm_tomorrow``. + """Independent timer loop that calls ``_maybe_prewarm_tomorrow``. Any exception in the body is caught and logged; the loop must never die silently because the scheduler thread has no way to @@ -141,13 +140,13 @@ try: self._maybe_prewarm_tomorrow() except Exception as exc: - self.logger.error("F6: pre-warm loop iteration failed: %s", exc) + self.logger.error("Pre-warm: loop iteration failed: %s", exc) sleep(self.PREWARM_CHECK_INTERVAL_S) def _maybe_prewarm_tomorrow(self): - """F6 - touch tomorrow's out.buf in [23:55 + jitter, 00:00) UTC. + """Touch tomorrow's out.buf in [23:55 + jitter, 00:00) UTC. - The 5-minute window is wider than the 2-minute F7 jitter range so + The 5-minute window is wider than the 2-minute jitter range so every device gets a fire opportunity inside the window regardless of thread sleep drift. Once-per-date by ``_last_prewarm_date`` ensures idempotency; failures do NOT update the date so the next @@ -161,7 +160,7 @@ today_key = now.date() if self._last_prewarm_date == today_key: return - # F7 jitter: delay the fire until (seconds since 23:55) >= jitter. + # Jitter delays the fire until (seconds since 23:55) >= jitter. # Spreads the fleet fire-times across a 2-minute window starting # at 23:55:00. seconds_into_window = (now.minute - self.PREWARM_WINDOW_START_MINUTE) * 60 + now.second @@ -174,6 +173,6 @@ with open(path, "a"): pass self._last_prewarm_date = today_key - self.logger.debug("F6: pre-warmed %s (jitter=%ds)", path, self._jitter_seconds) + self.logger.debug("Pre-warmed %s (jitter=%ds)", path, self._jitter_seconds) except (IOError, OSError) as exc: - self.logger.warning("F6: pre-warm failed for %s: %s", path, exc) + self.logger.warning("Pre-warm failed for %s: %s", path, exc) Index: cloudsync/common/enums.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/common/enums.py (.../enums.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/common/enums.py (.../enums.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -139,11 +139,10 @@ CS_LOG_ERROR = 931 CS_FACTORY_RESET_ERROR = 932 CS_LOG_RETENTION_ERROR = 933 - # F14c (ST-14, 0.5.5 build 2): persistent auth-failure escalation. - # Emitted once per cool-down window when repeated - # CS_GET_NEW_TOKEN_WITH_CERT_ERROR events cross the threshold. - # Operator-actionable signal: Keycloak auth is broken, investigate - # identity server / cert / Keycloak client config. + # Persistent auth-failure escalation. Emitted once per cool-down + # window when repeated CS_GET_NEW_TOKEN_WITH_CERT_ERROR events cross + # the threshold. Operator-actionable signal: Keycloak auth is broken + # — investigate identity server / cert / Keycloak client config. CS_AUTH_PERSISTENT_FAIL = 934 @@ -152,9 +151,18 @@ DISCONNECTED = 1 # Backup & retry CREDENTIAL = 2 # Backup & NO upload DUPLICATE = 3 # Backup & rename + # Persistent upload validation failure. Emitted once per + # (path, ui_sha) triple when repeated SHA-mismatch or chunked-upload + # failures cross the give-up threshold. CS-side suppress is the + # load-bearing defense: subsequent 1010s for the same triple are + # silent-dropped on CS regardless of UI behavior. UI MAY honor this + # code (suggested: backup & stop trying for that triple); current UI + # ignores unknown codes and keeps retrying — CS-side suppress still + # closes the storm. + VALIDATION_GIVEUP = 4 -# F9 — Lane classification for network requests (0.5.5 ST-5). +# Lane classification for network requests. # # The state lane (the existing single-slot ``NetworkRequestHandler``, # ``max_size=1``) serves registration-mode traffic, ``SET_DEVICE_STATE`` @@ -165,7 +173,7 @@ # absorbs long-running, filename-keyed uploads that would otherwise # starve the state lane during 10–30 minute transfers: # -# - ``CS2DCS_REQ_SEND_DEVICE_LOG`` (up to ~350 MB, HD000118 root cause) +# - ``CS2DCS_REQ_SEND_DEVICE_LOG`` (up to ~350 MB) # - ``CS2DCS_REQ_SEND_CS_LOG`` (up to ~300 MB on debug days) # # Both lanes run a single worker each: ``max_size`` is queue depth, not Index: cloudsync/handlers/auth_failure_tracker.py =================================================================== diff -u -rf26f09af1e10683fcbeb7fac051976ee0cd504f0 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/handlers/auth_failure_tracker.py (.../auth_failure_tracker.py) (revision f26f09af1e10683fcbeb7fac051976ee0cd504f0) +++ cloudsync/handlers/auth_failure_tracker.py (.../auth_failure_tracker.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -1,31 +1,29 @@ -"""F14c — Persistent auth-failure escalation tracker (ST-14). +"""Persistent auth-failure escalation tracker. Aggregates repeated ``CS_GET_NEW_TOKEN_WITH_CERT_ERROR`` events into a single operator-visible ``CS_AUTH_PERSISTENT_FAIL`` escalation. Uses a -sliding window over the monotonic clock (consistent with F13) so -wall-clock jumps neither spuriously escalate nor suppress escalation. +sliding window over the monotonic clock so wall-clock jumps neither +spuriously escalate nor suppress escalation. Thresholds are in-memory only — on process restart, the counter resets. -If the underlying failure persists, escalation re-fires within ~N seconds -after boot, which is the desired operator signal (the issue survives -reboot). +If the underlying failure persists, escalation re-fires within ~N +seconds after boot, which is the desired operator signal (the issue +survives reboot). -Wiring (F15 — ST-15): ``NetworkRequestHandler._refresh_via_cert`` calls +Wiring: ``NetworkRequestHandler._refresh_via_cert`` calls :meth:`PersistentAuthFailureTracker.record_failure` after every ``cmd_outgoing_get_new_token_with_cert`` call that returns ``None``, then checks :meth:`PersistentAuthFailureTracker.should_escalate` to decide -whether to enqueue the 934 error. (Pre-F15 the hook lived in -``get_valid_token``; the tracker contract is unchanged.) +whether to enqueue the 934 error. """ from collections import deque from threading import Lock from time import monotonic -# Default thresholds chosen to match DVT-9 observed failure rate: -# ~1 Hz of silent-fails during the "3 reboots" capture, so 20 failures -# in 5 min is easily crossed when the issue is active and not crossed -# under normal intermittent-network conditions. +# Default thresholds: ~1 Hz of silent failures (observed under a +# Keycloak brute-force lockout) crosses 20-in-5-min easily, while normal +# intermittent-network conditions stay well below. DEFAULT_WINDOW_S = 300 # 5 minutes sliding window DEFAULT_THRESHOLD = 20 # N failures within the window DEFAULT_COOLDOWN_S = 600 # 10 minutes between escalations Index: cloudsync/handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u -rf26f09af1e10683fcbeb7fac051976ee0cd504f0 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision f26f09af1e10683fcbeb7fac051976ee0cd504f0) +++ cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -3,7 +3,7 @@ from time import monotonic, time from logging import Logger -from threading import Event, Thread +from threading import Event, Lock, Thread from collections import deque from random import seed, randint @@ -18,14 +18,77 @@ from cloudsync.handlers.outgoing.handler_cs_to_dcs import * from cloudsync.handlers.incoming.handler_mft_to_cs import * + +# Persistent chunked-upload give-up tracker. When repeated chunked- +# upload failures (any ``accepted=False`` outcome) for the same device- +# log path cross the threshold, CS-side suppress drops subsequent +# upload attempts for that path silently for the suppress duration -- +# closes the storm regardless of whether the UI honors the new wire +# reason code. A one-shot ``2010,3,,0,`` is +# emitted at threshold cross, replacing the per-attempt rejection wire +# for that one attempt; subsequent attempts in the suppress window emit +# nothing on the wire. +# +# Sliding window uses ``monotonic()`` because the device test protocols +# can change wall-clock mid-session. +# +# Per-path granularity (no UI SHA on the wire at this layer): a chunk- +# upload failure is identified solely by the device-log path it tried +# to upload. +_F3_GIVEUP_THRESHOLD = 5 +_F3_GIVEUP_WINDOW_S = 60.0 +_F3_GIVEUP_SUPPRESS_S = 3600.0 +_f3_giveup_failures: dict = {} # path -> list[monotonic timestamps] +_f3_giveup_until: dict = {} # path -> monotonic deadline +_f3_giveup_lock = Lock() + + +def _f3_giveup_is_suppressed(path) -> bool: + """Return True iff ``path`` is currently in the chunked-upload + give-up suppress window. Lazy-expires the entry on read when the + deadline has passed, so no separate cleanup thread is needed.""" + with _f3_giveup_lock: + deadline = _f3_giveup_until.get(path) + if deadline is None: + return False + if monotonic() >= deadline: + _f3_giveup_until.pop(path, None) + return False + return True + + +def _f3_giveup_record_failure(path) -> bool: + """Record a chunked-upload failure for ``path`` and return True iff + this failure is the one that crosses the give-up threshold. + + Lazy-prunes timestamps older than the sliding window. On threshold + cross, enters the suppress window and clears the failure list so a + fresh threshold cross can occur after the suppress entry expires + (i.e. the upstream problem persists past 1 hour).""" + now = monotonic() + cutoff = now - _F3_GIVEUP_WINDOW_S + with _f3_giveup_lock: + deadline = _f3_giveup_until.get(path) + if deadline is not None and now < deadline: + return False + bucket = _f3_giveup_failures.setdefault(path, []) + bucket[:] = [t for t in bucket if t >= cutoff] + bucket.append(now) + if len(bucket) >= _F3_GIVEUP_THRESHOLD: + _f3_giveup_until[path] = now + _F3_GIVEUP_SUPPRESS_S + _f3_giveup_failures.pop(path, None) + return True + return False + + class NetworkRequestHandler: - # F9 — single-slot ordering invariant. The state lane's max_size MUST + # Single-slot ordering invariant. The state lane's max_size MUST # remain 1 so ordering of SET_DEVICE_STATE transitions is preserved. DEFAULT_MAX_SIZE = 1 - # F13 — idle-tick cadence. A lane with no arriving work must still - # register as alive to the watchdog. 60s is well below both - # state-lane max_idle_s=300 and idempotent-lane max_idle_s=1800. + # Idle-tick cadence. A lane with no arriving work must still register + # as alive to the watchdog. 60s is well below both state-lane + # max_idle_s=300 and idempotent-lane max_idle_s=1800. IDLE_TICK_S = 60 def __init__(self, logger: Logger, max_size, output_channel, reachability_provider, error_handler): @@ -36,13 +99,13 @@ self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe - # F1 — semantic-progress marker updated after each request completes. + # Semantic-progress marker updated after each request completes. # Default 0 signals "no work yet"; the watchdog tolerates this until # the first tick. self.last_progress_ts = 0 - # F14c — persistent auth-failure tracker, invoked from - # _refresh_via_cert to aggregate repeated Keycloak fetch failures - # into a single operator-visible CS_AUTH_PERSISTENT_FAIL escalation. + # Persistent auth-failure tracker, invoked from _refresh_via_cert + # to aggregate repeated Keycloak fetch failures into a single + # operator-visible CS_AUTH_PERSISTENT_FAIL escalation. from cloudsync.handlers.auth_failure_tracker import PersistentAuthFailureTracker self.auth_failure_tracker = PersistentAuthFailureTracker() self.thread = Thread(target=self.scheduler, daemon=True) @@ -55,9 +118,9 @@ :return: None """ while True: - # F13 — top-of-loop tick in monotonic units. An idle lane - # (especially the idempotent one after a restart) still counts - # as alive; wall-clock jumps don't trip the watchdog. + # Top-of-loop tick in monotonic units. An idle lane (especially + # the idempotent one after a restart) still counts as alive; + # wall-clock jumps don't trip the watchdog. self.last_progress_ts = monotonic() flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: @@ -66,9 +129,9 @@ try: self.handle_request(req) finally: - # F1 — mark progress after each request returns, - # success or failure. - # F13 — monotonic (clock-jump robust). + # Mark progress after each request returns, success + # or failure. Monotonic clock so a wall-clock step + # does not poison the marker. self.last_progress_ts = monotonic() clear_correlation_id() self.event.clear() @@ -459,12 +522,30 @@ if not os.path.exists(device_log_data['path']): self.logger.warning(f"Device log file no longer available: {device_log_data['path']}") + _fn = os.path.basename(device_log_data['path']) error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, - f"Device log file no longer available: {device_log_data['path']}") + format_upload_error_body( + _fn, + f"[Errno 2] No such file or directory: " + f"'{device_log_data['path']}'")) self.error_handler.enqueue_error(error=error) return + # Layer-1 give-up suppress. Once a path has crossed + # the persistent-chunked-upload-failure threshold, + # subsequent upload attempts for that path are + # silently dropped for the suppress window: no + # chunked upload is attempted, no wire signal is + # emitted, no Error 930 is raised. Closes the storm + # even if the UI keeps queuing 1010s. + if _f3_giveup_is_suppressed(device_log_data['path']): + self.logger.info( + "Silent-drop chunked upload for %s " + "(persistent-failure suppress active)", + device_log_data['path']) + return + device_log_json = helpers_construct_device_log_json(device_log_data) upload_result = cmd_outgoing_upload_file_in_chunks(base_url=base_url, @@ -476,15 +557,41 @@ correlation_id=_cid, device_sn=_dsn) if isinstance(upload_result, dict) and not upload_result.get("accepted"): - # F2 — protocol-shaped rejection covering every terminal - # failure (409 Duplicate reason=3, connection/timeout/5xx - # reason=1 Disconnected, 401/403 reason=2 Credential). - # UI-Brain's existing rejection handlers engage on each. - message_body = "{0},3,{1},0,{2}".format( - OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, - upload_result["filename"], - upload_result["reason_code"]) - self.output_channel.enqueue_message(message_body) + # Persistent-failure tracker: record the chunked- + # upload failure and, on the threshold-cross + # attempt, replace the per-attempt rejection wire + # with a one-shot give-up wire and emit a + # diagnostic Error 930. Subsequent attempts for + # the same path are silent-dropped at the top of + # this branch for the suppress duration. + if _f3_giveup_record_failure(device_log_data['path']): + wire_msg = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + upload_result["filename"], + LogUploadReasonCode.VALIDATION_GIVEUP.value) + self.output_channel.enqueue_message(wire_msg) + giveup_err = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + format_upload_error_body( + upload_result["filename"], + "Persistent chunked-upload give-up: " + "{0} failures within {1}s; " + "suppressing further uploads for {2}s".format( + _F3_GIVEUP_THRESHOLD, + int(_F3_GIVEUP_WINDOW_S), + int(_F3_GIVEUP_SUPPRESS_S)))) + self.error_handler.enqueue_error(error=giveup_err) + else: + # Protocol-shaped rejection covering every terminal + # failure (409 Duplicate reason=3, connection/timeout/5xx + # reason=1 Disconnected, 401/403 reason=2 Credential). + # UI-Brain's existing rejection handlers engage on each. + message_body = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + upload_result["filename"], + upload_result["reason_code"]) + self.output_channel.enqueue_message(message_body) elif isinstance(upload_result, str): # Success — send 2010 with 3 params: filename, 1 (success), 0 (no error) self.logger.debug(f"Device log file uploaded: {upload_result}") @@ -493,15 +600,22 @@ upload_result) self.output_channel.enqueue_message(message_body) else: + _fn = os.path.basename(device_log_data.get('path', '')) if isinstance(device_log_data, dict) else '' error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, - "Missing access token") + format_upload_error_body(_fn, "Missing access token")) self.error_handler.enqueue_error(error=error) except Exception as e: + # device_log_data may not have been bound yet if exception + # fired before payload extraction; best-effort filename. + try: + _fn = os.path.basename(req.payload.get('path', '')) if isinstance(req.payload, dict) else '' + except Exception: + _fn = '' error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, - str(e)) + format_upload_error_body(_fn, str(e))) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG: try: @@ -585,22 +699,27 @@ pass else: + _fn = os.path.basename(cs_log_data.get('path', '')) if isinstance(cs_log_data, dict) else '' error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_ERROR.value, - "Missing access token") + format_upload_error_body(_fn, "Missing access token")) self.error_handler.enqueue_error(error=error) except Exception as e: + try: + _fn = os.path.basename(req.payload.get('path', '')) if isinstance(req.payload, dict) else '' + except Exception: + _fn = '' error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_ERROR.value, - str(e)) + format_upload_error_body(_fn, str(e))) self.error_handler.enqueue_error(error=error) else: g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) def _refresh_via_cert(self, identity_url, client_secret, correlation_id="", device_sn=""): - """F15d — shared cert-auth refresh path with F14c tracker wiring. + """Shared cert-auth refresh path with persistent-failure tracking. Fetches a fresh access token from Keycloak via device cert auth. On failure (None return), increments the persistent-auth-failure @@ -609,8 +728,7 @@ Used by ``_acquire_token`` (first-time token fetch) and by the per-site retry loops in ``cmd_outgoing_*`` (reactive refresh on - 401/403). Hook point for F14c moves here from the legacy - ``get_valid_token`` when F15c lands. + 401/403). :return: Fresh access token on success, or None on cert-auth failure. """ @@ -639,20 +757,21 @@ "the specific Keycloak response body.") self.error_handler.enqueue_error(error=escalation) self.logger.error( - "F14c: persistent auth failure escalated to UI-Brain " + "Persistent auth failure escalated to UI-Brain " "(CS_AUTH_PERSISTENT_FAIL)") return access_token def _acquire_token(self, identity_url, client_secret, correlation_id="", device_sn=""): - """F15c — get an access token for a DCS request. + """Get an access token for a DCS request. Returns the stored token if present (server decides validity via JwtBearer). If the stored token is missing/corrupt, fetches a - fresh one via cert auth through ``_refresh_via_cert`` (F14c - tracked). May return None on persistent cert-auth failure; - callers must treat None as 'cannot proceed with this request'. + fresh one via cert auth through ``_refresh_via_cert`` (which + records persistent failures). May return None on persistent + cert-auth failure; callers must treat None as 'cannot proceed + with this request'. """ tok = helpers_get_stored_token() if tok is not None: @@ -673,7 +792,7 @@ class IdempotentRequestHandler(NetworkRequestHandler): - """F9 — idempotent lane for long-running, filename-keyed uploads. + """Idempotent lane for long-running, filename-keyed uploads. Structurally identical to :class:`NetworkRequestHandler` — inherits ``scheduler`` / ``enqueue_request`` / ``handle_request``. The only Index: cloudsync/handlers/logs_handler.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -1,25 +1,25 @@ """Custom rotating log handler for CloudSync. -F5 (0.5.5): upload is decoupled from rotation - the actual CS-log upload -is driven by ``HeartBeatProvider`` on its 20-second tick when the -idempotent lane is idle. This class's sole responsibility is rotation. +Upload is decoupled from rotation: the actual CS-log upload is driven +by ``HeartBeatProvider`` on its 20-second tick when the idempotent +lane is idle. This class's sole responsibility is rotation. -F12 (0.5.5): clock-jump tolerant rotation. ``logging.handlers.TimedRotatingFileHandler`` +Clock-jump tolerant rotation. ``logging.handlers.TimedRotatingFileHandler`` computes ``rolloverAt`` from the REAL wall-clock at boot and never revisits that decision unless a rollover actually fires. On devices where operators simulate day transitions by setting the system clock -to 23:59 and waiting, the stdlib rolloverAt remains anchored to the +to 23:59 and waiting, the stdlib ``rolloverAt`` stays anchored to the real boot time and never matches the simulated time, so rotations are silently skipped (confirmed against CPython 3.11). -F12 replaces the stdlib trigger with a UTC-date-equality check: -rotation fires whenever the current UTC date differs from the date on -which the active log file was opened. Any clock jump - forward, -backward, or past multiple midnight boundaries - is honored at the -very next log emit. Collision safety: if the rotated-file suffix -already exists on disk (operator set clock backward then forward -again across the same date), a numeric counter is appended so no prior -rotated file is silently overwritten. +This handler replaces the stdlib trigger with a UTC-date-equality +check: rotation fires whenever the current UTC date differs from the +date on which the active log file was opened. Any clock jump — +forward, backward, or past multiple midnight boundaries — is honored +at the very next log emit. Collision safety: if the rotated-file +suffix already exists on disk (operator set clock backward then +forward again across the same date), a numeric counter is appended +so no prior rotated file is silently overwritten. """ from logging.handlers import TimedRotatingFileHandler @@ -51,7 +51,7 @@ self.network_request_handler = None self.error_handler = None self.g_config = None - # F12 - track the UTC date of the currently-active file. Rotation + # Track the UTC date of the currently-active file. Rotation # decisions compare this against datetime.now(utc).date() on every # emit, so any clock jump is caught immediately regardless of the # stdlib's boot-locked rolloverAt. @@ -63,9 +63,9 @@ return _dt_module.datetime.now(_dt_module.timezone.utc).date() def shouldRollover(self, record): - """F12 - rotate whenever the UTC date changes, ignoring - self.rolloverAt (which is a boot-time fixed value that does not - adapt to simulated clock jumps).""" + """Rotate whenever the UTC date changes, ignoring self.rolloverAt + (which is a boot-time fixed value that does not adapt to + simulated clock jumps).""" try: return self._current_utc_date() != self._active_log_date except Exception: @@ -78,12 +78,14 @@ return False def doRollover(self): - """F12 - rotate using the current simulated time, not - self.rolloverAt. Computes the rotated-file suffix from the - UTC date of the log being rotated out (the data's original date). - Guarantees no silent overwrite of a pre-existing file with the - same suffix (collisions happen when operators set the clock - backward then forward across the same date boundary).""" + """Rotate using the current simulated time, not self.rolloverAt. + + Computes the rotated-file suffix from the UTC date of the log + being rotated out (the data's original date). Guarantees no + silent overwrite of a pre-existing file with the same suffix + (collisions happen when operators set the clock backward then + forward across the same date boundary). + """ if self.stream: self.stream.close() self.stream = None Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u -rf26f09af1e10683fcbeb7fac051976ee0cd504f0 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision f26f09af1e10683fcbeb7fac051976ee0cd504f0) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -1,6 +1,7 @@ """Handler of commands sent by CloudSync app to the Diality Cloud System""" from typing import List, Tuple +from time import monotonic import json import requests import urllib.parse @@ -13,7 +14,7 @@ def _upload_rejection(filename: str, reason_code: int) -> dict: - """F2 — build a terminal-failure rejection dict for an upload call. + """Build a terminal-failure rejection dict for an upload call. Returned instead of ``None`` whenever the upload cannot be completed. The caller (cs_mft_dcs_request_handler upload dispatch) translates this @@ -25,8 +26,22 @@ return {"accepted": False, "filename": filename, "reason_code": reason_code} +def format_upload_error_body(filename: str, body: str) -> str: + """Prefix the device-log filename onto an Error 930 message body so + a single ``grep`` on a CS log can correlate every upload-pipeline + error with its file. Empty/missing filename collapses to + ```` so the prefix shape is stable across all callers. + + Every upload-pipeline emit site (CS_DEVICE_LOG_ERROR / + CS_LOG_ERROR) MUST go through this helper; an AST-walk audit test + in the test suite enforces this contract. + """ + fn = filename if filename else "unknown" + return f"<{fn}> {body}" + + def _classify_http_reason(status_code: int) -> int: - """F2 — map an HTTP status to a LogUploadReasonCode. + """Map an HTTP status to a LogUploadReasonCode. 401 / 403 → CREDENTIAL (reason 2). Genuine auth failures require operator attention; UI-Brain backs up without retry. @@ -41,7 +56,7 @@ def _should_retry_with_refresh(resp) -> bool: - """F15b — retry predicate for reactive cert-auth refresh. + """Retry predicate for reactive cert-auth refresh. Returns True iff the response is an HTTP 401 or 403 — the only status codes that indicate a stale/invalid token. 409 (duplicate), @@ -83,7 +98,7 @@ token_refresher: callable, error_handler: ErrorHandler, error_id: int): - """F15b — single-retry-on-401/403 wrapper shared by every DCS call. + """Single-retry-on-401/403 wrapper shared by every DCS call. ``build_request(access_token)`` is a caller-supplied closure that performs the HTTP call and returns a ``requests.Response``. The @@ -172,20 +187,19 @@ g_utils.logger.debug("Making request: {0}, {1}, {2}, {3}".format(url, headers, payload, cert_paths)) - # F14a — split connect/read timeout. 5s was too tight for the read - # phase (13× timeout observed on DVT-9); keep connect tight so we - # fail fast on unreachable identity server. + # Split connect/read timeout. The 5s default was too tight for + # the read phase under field conditions; keep connect tight so + # we fail fast on an unreachable identity server. response = requests.post(url=url, headers=headers, data=payload, cert=cert_paths, timeout=(5, 30)) - # F14a — surface silent Keycloak failures. Prior to this, a non-2xx - # response or a 2xx response that lacked "access_token" caused this - # function to return None without enqueuing an error, producing - # thousands of downstream "Missing access token" errors with no - # diagnostic trail. See st14_phase0_findings.md H2 for field data. + # Surface Keycloak failures. A non-2xx response (or a 2xx response + # that lacks "access_token") would otherwise return None without + # enqueuing an error, producing thousands of downstream "Missing + # access token" errors with no diagnostic trail. if response.status_code != 200: body_preview = (response.text or "")[:200] error = Error.general( @@ -311,7 +325,7 @@ :param device_state_json: device state payload :param access_token: access token :param error_handler: global error handler - :param token_refresher: F15b — callable returning a fresh access + :param token_refresher: Optional callable returning a fresh access token (or None on failure) when a 401/403 is received. Set to None by legacy callers; bounded to a single retry per call. :return: The response @@ -392,7 +406,7 @@ :param access_token: access token :param associate: status of the device & patient association :param error_handler: global error handler - :param token_refresher: F15b — callable returning a fresh access + :param token_refresher: Optional callable returning a fresh access token (or None) on 401/403; wraps the whole HEAD+PUT or DELETE branch so a single refresh re-runs the full sub-request chain. :return: The response @@ -428,7 +442,7 @@ :param access_token: access token :param treatment_log: treatment report sent to DCS :param error_handler: global error handler - :param token_refresher: F15b — callable returning a fresh access + :param token_refresher: Optional callable returning a fresh access token (or None) on 401/403; bounded to a single retry. :return: The response """ @@ -512,20 +526,51 @@ ERROR_ID = ErrorIDs.CS_DEVICE_LOG_ERROR.value if log_file_origin == 'device' else ErrorIDs.CS_LOG_ERROR.value - # F2 — extract filename early so every rejection path can surface it to + # Extract filename early so every rejection path can surface it to # UI-Brain via the protocol-shaped 2010 message. try: _filename = str(file_json['start_session']['metadata']['deviceFileName']) except (KeyError, TypeError): _filename = "" + # Single structured upload-outcome line at every function exit so an + # operator can grep one CS-log line per upload (success or failure) + # instead of correlating across many INFO/ERROR lines. State dict is + # populated incrementally as the upload progresses; the closures + # below capture it by reference and emit on every return path. + _summary = { + "file_size": None, + "num_chunks": 0, + "chunks_succeeded": 0, + "start_time": monotonic(), + "session_id": None, + "sha256": "", + } + + def _emit_failed(reason: str) -> None: + duration_s = int(monotonic() - _summary["start_time"]) + g_utils.logger.error( + f"upload_failed file={_filename} size={_summary['file_size']} " + f"chunks={_summary['chunks_succeeded']}/{_summary['num_chunks']} " + f"duration_s={duration_s} session={_summary['session_id'] or 'none'} " + f"reason='{reason}'" + ) + + def _emit_complete() -> None: + duration_s = int(monotonic() - _summary["start_time"]) + g_utils.logger.info( + f"upload_complete file={_filename} size={_summary['file_size']} " + f"chunks={_summary['num_chunks']}/{_summary['num_chunks']} " + f"duration_s={duration_s} session={_summary['session_id']} " + f"sha256={_summary['sha256']}" + ) + # # Start upload session # - # F15b — reactive retry on 401/403 (server authority). Replaces the - # pre-F15 proactive `token_refresher()` call that refreshed every - # upload's start-session unconditionally; DCS's JwtBearer middleware - # now decides when a refresh is needed. + # Reactive retry on 401/403 (server authority): DCS's JwtBearer + # middleware decides when a refresh is needed; the client only + # refreshes in response to an explicit auth-class status code. start_session_url = base_url.rstrip("/") + "/api/device/data/start-session" start_session_payload = json.dumps(file_json['start_session']) @@ -547,7 +592,9 @@ g_utils.logger.info(f"Start-session response: {response.status_code}") except Exception as e: error_handler.enqueue_error( - error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e))) + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body(_filename, f"Start-session failed: {e}"))) + _emit_failed(f"start-session connection error: {e}") return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) if _should_retry_with_refresh(response) and attempt == 1 and token_refresher is not None: @@ -560,24 +607,31 @@ if response.status_code == CONFLICT: g_utils.logger.info(f"File {_filename} rejected as duplicate at start-session (409 Conflict).") + _emit_failed("start-session 409 duplicate") return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) if response.status_code != 200: _log_dcs_trace_id(response, correlation_id) g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") - # F2 — map auth-class status codes to CREDENTIAL; everything + # Map auth-class status codes to CREDENTIAL; everything # else terminal → DISCONNECTED. reason = _classify_http_reason(response.status_code) error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, - f"start-session {response.status_code}") + format_upload_error_body( + _filename, + f"Start-session failed: {response.status_code} - {response.text[:500]}")) error_handler.enqueue_error(error=error) + _emit_failed(f"start-session {response.status_code}") return _upload_rejection(_filename, reason) session_id = response.json().get("sessionId") if not session_id: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, "Missing session ID in response.") + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body(_filename, "Start-session response missing sessionId")) error_handler.enqueue_error(error=error) + _emit_failed("missing sessionId") return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) + _summary["session_id"] = session_id # # Send file in chunks @@ -590,53 +644,66 @@ upload_chunk_payload_template = file_json['upload_chunk'] upload_chunk_payload_template['sessionId'] = session_id upload_chunk_payload_template['chunkType'] = "device-data" - chunk_number = 1 + # chunkNo is derived from a 1-indexed loop counter (chunk_idx), + # NOT from a state variable that tracks success. Defensive + # against any future refactor that routes "advance to next + # chunk after failure" through this code, which would silently + # send the same chunkNo with different bytes. + chunk_idx = 0 headers = _build_dcs_headers(access_token, correlation_id, device_sn) - # F11 (0.5.5): stream the file in raw chunks aligned to a 3-byte - # boundary and base64-encode each chunk independently. Peak RAM - # per upload: one chunk (~2 MB) instead of 7/3 x file_size (the - # old path kept the full file bytes + full base64 string resident - # simultaneously, which on a ~300 MB CS log pushed a 2 GB device - # into OOM-kill). Raw-chunk size must be a multiple of 3 so each + # Stream the file in raw chunks aligned to a 3-byte boundary and + # base64-encode each chunk independently. Peak RAM per upload: + # one chunk (~2 MB) instead of 7/3 × file_size (a non-streaming + # path keeps both the full file bytes and the full base64 string + # resident, which on ~300 MB CS logs pushed 2 GB devices into + # OOM-kill). Raw-chunk size MUST be a multiple of 3 so each # chunk's base64 output concatenates to the same string as - # base64(whole file) - preserving server-side reassembly + # base64(whole file) — preserving server-side reassembly # semantics unchanged. raw_chunk_size = (chunk_size // 4) * 3 if raw_chunk_size < 3: raw_chunk_size = 3 # degenerate safeguard # total_base64_size and num_chunks are derived arithmetically # from file_size so we can report progress ("chunk K of N") the - # same way the pre-F11 code did, without requiring a pre-scan. + # without requiring a pre-scan. if file_size is None: file_size = os.path.getsize(target_file) total_base64_size = 4 * ((file_size + 2) // 3) num_chunks = total_base64_size // chunk_size + (total_base64_size % chunk_size > 0) if num_chunks == 0: num_chunks = 1 # empty/tiny file still goes through once + _summary["file_size"] = file_size + _summary["num_chunks"] = num_chunks all_chunks_ok = True with open(target_file, "rb") as f: while True: raw_block = f.read(raw_chunk_size) if not raw_block: break + chunk_idx += 1 chunk = base64.b64encode(raw_block).decode("utf8") # Retry logic with counter and backoff time. + # auth_refresh_attempted is per-chunk: token refresh fires at + # most once per chunk across all retries. A still-stale token + # after refresh falls through to regular retry/abort instead + # of looping refresh forever. chunk_uploaded = False retry_count = 0 + auth_refresh_attempted = False while retry_count < retries: try: # Fresh dict per attempt so we don't carry the # prior chunk's state forward across retries. upload_chunk_payload = dict(upload_chunk_payload_template) - upload_chunk_payload['chunkNo'] = chunk_number + upload_chunk_payload['chunkNo'] = chunk_idx upload_chunk_payload['data'] = chunk upload_chunk_body = json.dumps(upload_chunk_payload) g_utils.logger.debug( - f"File upload payload (upload-chunk) - chunk No {chunk_number}: " + f"File upload payload (upload-chunk) - chunk No {chunk_idx}: " f"bytes={len(upload_chunk_body)}") response = requests.post(upload_chunk_url, @@ -645,13 +712,49 @@ timeout=(30, 60)) if response.status_code == 200: - g_utils.logger.info(f"Uploaded chunk {chunk_number} of {num_chunks}") - chunk_number += 1 + g_utils.logger.info(f"Uploaded chunk {chunk_idx} of {num_chunks}") chunk_uploaded = True + _summary["chunks_succeeded"] += 1 break # Successful upload, break retry loop + # Reactive auth-class refresh. On 401/403 ask + # token_refresher for a fresh token, rebind the + # Authorization header, and retry this chunk + # WITHOUT consuming a backoff slot. Token validity + # is server-authoritative; never decode + # access_token.exp or compare TTL to a wall-clock + # value -- device test protocols change the system + # clock to simulate midnight cycles, which corrupts + # any wall-clock-based expiry check. + if (_should_retry_with_refresh(response) + and not auth_refresh_attempted + and token_refresher is not None): + auth_refresh_attempted = True + refreshed = token_refresher() + if refreshed is not None: + access_token = refreshed + headers = _build_dcs_headers( + access_token, correlation_id, device_sn) + g_utils.logger.info( + f"Chunk {chunk_idx}/{num_chunks} got " + f"{response.status_code}; refreshed token " + f"and retrying immediately") + continue # auth correction, not a backoff retry + + g_utils.logger.warning( + f"Chunk {chunk_idx}/{num_chunks} got " + f"{response.status_code} but token refresh " + f"failed; falling through to retry/abort") + error_handler.enqueue_error( + error=Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body( + _filename, + f"Token refresh failed during chunk " + f"{chunk_idx}/{num_chunks} upload"))) + g_utils.logger.warning( - f"Chunk {chunk_number}/{num_chunks} upload failed: " + f"Chunk {chunk_idx}/{num_chunks} upload failed: " f"{response.status_code} - {response.text[:500]}") retry_count += 1 if retry_count < retries: @@ -663,37 +766,44 @@ except Exception as e: retry_count += 1 g_utils.logger.error( - f"Chunk {chunk_number}/{num_chunks} exception " + f"Chunk {chunk_idx}/{num_chunks} exception " f"(attempt {retry_count}/{retries}): {e}") if retry_count < retries: sleep(5) if not chunk_uploaded: g_utils.logger.error( - f"Chunk {chunk_number} failed after {retries} retries, aborting upload") + f"Chunk {chunk_idx} failed after {retries} retries, aborting upload") all_chunks_ok = False break if not all_chunks_ok: + chunks_done = chunk_idx - 1 if chunk_idx > 0 else 0 error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, - f"Upload aborted: chunk {chunk_number} failed after {retries} retries") + format_upload_error_body( + _filename, + f"Chunked upload incomplete: {chunks_done}/{num_chunks} " + f"chunks succeeded; failed at chunk {chunk_idx} " + f"after {retries} retries")) error_handler.enqueue_error(error=error) - # F2 — chunk retries exhausted → Disconnected + _emit_failed(f"chunk {chunk_idx} retries exhausted") + # Chunk retries exhausted → Disconnected return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body( + _filename, f"Unhandled exception in chunk loop: {e}")) error_handler.enqueue_error(error=error) - # F2 — unhandled exception in chunk loop → Disconnected (safest default) + _emit_failed(f"chunk-loop exception: {e}") + # Unhandled exception in chunk loop → Disconnected (safest default) return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) # # End upload session # - # F15b — reactive retry on 401/403 (server authority). Replaces the - # pre-F15 proactive `token_refresher()` call that refreshed after - # every chunked upload regardless of whether the token was actually - # stale. + # Reactive retry on 401/403 (server authority): refresh only when + # the server explicitly rejects, not on every end-session call. end_session_url = base_url.rstrip("/") + "/api/device/data/end-session" end_session_payload = file_json['end_session'] @@ -714,7 +824,9 @@ g_utils.logger.info(f"End-session response: {response.status_code}") except Exception as e: error_handler.enqueue_error( - error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e))) + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + format_upload_error_body(_filename, f"End-session failed: {e}"))) + _emit_failed(f"end-session connection error: {e}") return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) if _should_retry_with_refresh(response) and attempt == 1 and token_refresher is not None: @@ -727,16 +839,28 @@ if response.status_code == CONFLICT: g_utils.logger.info(f"File {_filename} rejected as duplicate (409 Conflict).") + _emit_failed("end-session 409 duplicate") return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) if response.status_code != 200: _log_dcs_trace_id(response, correlation_id) g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") reason = _classify_http_reason(response.status_code) + # Include response body (truncated) so the operator can tell a + # 500-from-DCS-bug apart from a 500-from-bad-payload from a + # single Error 930 line. error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, - f"end-session {response.status_code}") + format_upload_error_body( + _filename, + f"End-session failed: {response.status_code} - {response.text[:500]}")) error_handler.enqueue_error(error=error) + _emit_failed(f"end-session {response.status_code}") return _upload_rejection(_filename, reason) g_utils.logger.info(f"File {file_json['start_session']['metadata']['deviceFileName']} uploaded.") + try: + _summary["sha256"] = str(file_json['end_session'].get('checksum', ''))[:16] + except (AttributeError, TypeError): + _summary["sha256"] = "" + _emit_complete() return str(file_json['start_session']['metadata']['deviceFileName']) Index: cloudsync/handlers/ui_cs_request_handler.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -1,10 +1,11 @@ from logging import Logger -from threading import Event, Thread +from threading import Event, Lock, Thread from collections import deque from cloudsync.handlers.uics_message import UICSMessage from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error +from cloudsync.handlers.outgoing.handler_cs_to_dcs import format_upload_error_body from cloudsync.common.enums import * from cloudsync.utils.globals import * from cloudsync.utils.helpers import * @@ -14,33 +15,113 @@ import os import shutil import time +# Explicit import: cloudsync.utils.helpers does ``from time import time``, +# which would shadow the ``time`` module reference for any subsequent +# ``time.`` access via the star-import above. Importing the +# specific binding by name avoids the shadow. +from time import monotonic +# In-flight 1010 dedup. Set holds device-log paths currently being +# processed inside the UI2CS_UPLOAD_DEVICE_LOG branch (SHA check + +# enqueue). Parallel 1010s for the same path arriving while one is in +# flight are dropped silently with an INFO log -- prevents the storm +# pattern where a UI-side queue retries every 5 s, each retry kicking +# off a redundant SHA computation on a multi-MB log file. +_in_flight_uploads: set = set() +_in_flight_lock = Lock() + + +# Persistent SHA-mismatch give-up tracker. When repeated SHA-mismatch +# failures for the same (path, ui_sha) cross the threshold, CS-side +# suppress drops subsequent 1010s for that key silently for the +# suppress duration -- closes the storm regardless of UI behavior. +# A one-shot wire `2010,3,,0,` is also emitted +# at threshold cross for forward-compat with future UI versions that +# honor the new reason code; current UI ignores unknown codes and keeps +# retrying, but Layer-1 suppress drops every retry so the customer- +# visible CS log stays quiet. +# +# Sliding window uses ``monotonic()`` (NOT wall-clock): the device test +# protocols change system time to simulate midnight cycles, and a +# wall-clock window would corrupt the failure counter under those +# protocols. +# +# Per-(path, ui_sha) granularity: a different SHA for the same path is +# a legitimately new upload attempt and is not suppressed. +_GIVEUP_THRESHOLD = 5 +_GIVEUP_WINDOW_S = 60.0 +_GIVEUP_SUPPRESS_S = 3600.0 +_giveup_failures: dict = {} # (path, ui_sha) -> list[monotonic timestamps] +_giveup_until: dict = {} # (path, ui_sha) -> monotonic deadline +_giveup_lock = Lock() + + +def _giveup_is_suppressed(key) -> bool: + """Return True iff ``key`` is currently in the give-up suppress + window. Lazy-expires the entry on read when the deadline has + passed so no separate cleanup thread is needed.""" + with _giveup_lock: + deadline = _giveup_until.get(key) + if deadline is None: + return False + if monotonic() >= deadline: + _giveup_until.pop(key, None) + return False + return True + + +def _giveup_record_sha_failure(key) -> bool: + """Record a SHA-mismatch failure for ``key`` and return True iff + this failure is the one that crosses the give-up threshold. + + Lazy-prunes timestamps older than the sliding window. On + threshold cross, enters the suppress window and clears the + failure list so a fresh threshold cross can occur after the + suppress entry expires (i.e. the problem persists past 1 hour).""" + now = monotonic() + cutoff = now - _GIVEUP_WINDOW_S + with _giveup_lock: + # If already suppressed (race against a Layer-1 check), do + # not re-trigger; the caller should have silent-dropped. + deadline = _giveup_until.get(key) + if deadline is not None and now < deadline: + return False + bucket = _giveup_failures.setdefault(key, []) + bucket[:] = [t for t in bucket if t >= cutoff] + bucket.append(now) + if len(bucket) >= _GIVEUP_THRESHOLD: + _giveup_until[key] = now + _GIVEUP_SUPPRESS_S + _giveup_failures.pop(key, None) + return True + return False + + class UICSMessageHandler: def __init__(self, logger: Logger, max_size, network_request_handler, output_channel, reachability_provider, error_handler, idempotent_network_request_handler=None): self.logger = logger self.reachability_provider = reachability_provider self.network_request_handler = network_request_handler - # F9 — idempotent lane for long-running filename-keyed uploads - # (SEND_DEVICE_LOG). Back-compat default: when omitted, fall back to - # the state lane so existing tests and older wiring still function. + # Idempotent lane for long-running filename-keyed uploads + # (SEND_DEVICE_LOG). Back-compat default: when omitted, fall back + # to the state lane so existing tests and older wiring still function. self.idempotent_network_request_handler = ( idempotent_network_request_handler or network_request_handler) self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe - # F1 — progress marker; updated after each UI-CS message is handled. + # Progress marker; updated after each UI-CS message is handled. self.last_progress_ts = 0 self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() self.logger.info('Created UI_CS Handler') - # F13 — idle-tick cadence for the scheduler. An idle message handler - # must still register as alive to the watchdog. Well below max_idle_s=120. + # Idle-tick cadence for the scheduler. An idle message handler must + # still register as alive to the watchdog. Well below max_idle_s=120. IDLE_TICK_S = 60 def scheduler(self) -> None: @@ -49,9 +130,9 @@ :return: None """ while True: - # F13 — top-of-loop tick in monotonic units so wall-clock - # jumps do not trip the watchdog, and an idle queue still - # counts as healthy. + # Top-of-loop tick in monotonic units so wall-clock jumps do + # not trip the watchdog, and an idle queue still counts as + # healthy. self.last_progress_ts = time.monotonic() flag = self.event.wait(timeout=self.IDLE_TICK_S) if flag: @@ -60,8 +141,9 @@ try: self.handle_message(message) finally: - # F1 — mark progress after each message is fully processed. - # F13 — monotonic (clock-jump robust). + # Mark progress after each message is fully processed. + # Monotonic clock so a wall-clock step does not poison + # the marker. self.last_progress_ts = time.monotonic() clear_correlation_id() self.event.clear() @@ -133,8 +215,8 @@ helpers_write_config(None, CONFIG_PATH, message.g_config) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) - # F4 caller audit: REGISTRATION is a one-time provisioning - # flow; on backpressure there is no useful retry strategy — + # REGISTRATION is a one-time provisioning flow; on + # backpressure there is no useful retry strategy — # surface via warning and let the manufacturing tool retry. outcome, detail = helpers_add_to_network_queue( network_request_handler=self.network_request_handler, @@ -186,9 +268,9 @@ try: helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) - # F4 caller audit: SET_DEVICE_STATE is state-machine critical - # (state lane). A dropped state transition creates the - # "Active Not Ready" cloud-state blackout — report loss to UI. + # SET_DEVICE_STATE is state-machine critical (state lane). + # A dropped state transition creates the "Active Not Ready" + # cloud-state blackout — report loss to UI. outcome, detail = helpers_add_to_network_queue( network_request_handler=self.network_request_handler, request_type=NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE, @@ -227,13 +309,13 @@ treatment_log_json = helpers_read_treatment_log_file(message.parameters[0]) if treatment_log_json: - treatment_log_json['checksum'] = helpers_sha256_checksum(json.dumps(treatment_log_json['data'])) + treatment_log_json['checksum'] = helpers_sha256_string(json.dumps(treatment_log_json['data'])) treatment_log_json['serialNumber'] = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] g_utils.logger.debug("Treatment log {0}".format(treatment_log_json)) - # F4 caller audit: SEND_TREATMENT_REPORT — treatment-report - # loss is user-visible; surface a CS2UI_ERROR so UI-Brain + # SEND_TREATMENT_REPORT — treatment-report loss is + # user-visible; surface a CS2UI_ERROR so UI-Brain # can prompt a retry / mark the report as not synced. outcome, detail = helpers_add_to_network_queue( network_request_handler=self.network_request_handler, @@ -333,69 +415,149 @@ self.logger.info("UI2CS_UPLOAD_DEVICE_LOG request received") if (len(message.parameters) != 2) or (message.parameters[0] is None) or (message.parameters[1] is None): + # Best-effort filename: params[0] may be present even if + # the count or params[1] is invalid. + _fn = '' + if message.parameters and len(message.parameters) > 0 and message.parameters[0]: + _fn = os.path.basename(str(message.parameters[0])) error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, - "invalid # of parameters for file upload request") + format_upload_error_body( + _fn, + "invalid # of parameters for file upload request")) self._enqueue_error(error, message.correlation_id) else: - try: + file_path = message.parameters[0] + ui_sha = message.parameters[1] + giveup_key = (file_path, ui_sha) - local_checksum = helpers_sha256_checksum(message.parameters[0]) - if local_checksum != message.parameters[1]: - raise ValueError('No valid sha256 value.') + # Layer-1 give-up suppress. Once a (path, ui_sha) + # has crossed the persistent-failure threshold, + # subsequent 1010s for that key are silently dropped + # for the suppress window: no SHA computation, no + # in-flight slot, no queue, no Error 930, no 2010 + # emit. Closes the storm even if the UI keeps + # retrying (current UI behavior for unknown reason + # codes). + if _giveup_is_suppressed(giveup_key): + self.logger.info( + "Silent-drop UI2CS_UPLOAD_DEVICE_LOG for %s " + "(persistent-failure suppress active)", file_path) + return - hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] - dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] - sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] + with _in_flight_lock: + if file_path in _in_flight_uploads: + self.logger.info( + "Ignoring duplicate UI2CS_UPLOAD_DEVICE_LOG " + "for %s (already in flight)", file_path) + return + _in_flight_uploads.add(file_path) - self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) + try: + try: + local_checksum = helpers_sha256_checksum(file_path) + if local_checksum != message.parameters[1]: + # Persistent-failure tracker: record the + # SHA-mismatch and, on the threshold-cross + # attempt, emit a one-shot wire signal + + # one diagnostic Error 930 distinct from + # the per-attempt Error 930 raised below. + # Subsequent 1010s for the same (path, + # ui_sha) are silent-dropped at the top of + # this handler for the suppress duration. + if _giveup_record_sha_failure(giveup_key): + device_file_name = os.path.basename(file_path) + wire_msg = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + device_file_name, + LogUploadReasonCode.VALIDATION_GIVEUP.value) + self.output_channel.enqueue_message(wire_msg) + giveup_err = Error.general( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + format_upload_error_body( + device_file_name, + "Persistent SHA-mismatch give-up: " + "{0} failures within {1}s; " + "suppressing further uploads for {2}s".format( + _GIVEUP_THRESHOLD, + int(_GIVEUP_WINDOW_S), + int(_GIVEUP_SUPPRESS_S)))) + self._enqueue_error(giveup_err, message.correlation_id) + # One log line tells the operator (a) which + # file, (b) what the UI claimed the SHA was, + # (c) what CS computed. Replaces the opaque + # 'No valid sha256 value.' that collapsed + # file-missing, SHA-mismatch, and content-drift + # into one indistinguishable error. + raise ValueError( + "SHA256 mismatch for {0}: UI={1}..., CS-computed={2}...".format( + file_path, + message.parameters[1][:16], + local_checksum[:16], + ) + ) - device_log_data = { - "path": message.parameters[0], - "serialNumber": message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], - "checksum": local_checksum - } + hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] + sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] - g_utils.logger.debug("Device log data {0}".format(device_log_data)) + self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) - # F9 — route to the idempotent lane so large log uploads - # (up to ~350 MB, 10–30 min) do not starve the state - # lane. Filename is the dedup key; per-request ordering - # between logs is not required. - # F4 caller audit + F2: SEND_DEVICE_LOG queue saturation is - # an upstream terminal failure. Emit the protocol-shaped - # 2010,3,,0,1 (Disconnected) rejection so UI-Brain's - # backup-and-retry handler engages, AND CS2UI_ERROR for - # operator visibility. Both are expected (layered). - outcome, detail = helpers_add_to_network_queue( - network_request_handler=self.idempotent_network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, - url='', - payload=device_log_data, - method='', - g_config=message.g_config, - success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to idempotent queue', - correlation_id=message.correlation_id) - if outcome != "queued": - self.logger.error( - "CS2DCS_REQ_SEND_DEVICE_LOG not queued (%s): %s", - outcome, detail) - # F2 — protocol-shaped rejection - device_file_name = os.path.basename( - device_log_data.get("path", "")) - rejection_body = "{0},3,{1},0,{2}".format( - OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, - device_file_name, - LogUploadReasonCode.DISCONNECTED.value) - self.output_channel.enqueue_message(rejection_body) - # CS2UI_ERROR for operator diagnostics (layered) - err = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_LOG_ERROR.value, - "Queue {0}: {1}".format(outcome, detail)) - self._enqueue_error(err, message.correlation_id) + device_log_data = { + "path": file_path, + "serialNumber": message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + "checksum": local_checksum + } - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_LOG_ERROR.value, - str(e)) - self._enqueue_error(error, message.correlation_id) + g_utils.logger.debug("Device log data {0}".format(device_log_data)) + + # Route to the idempotent lane so large log uploads + # (up to ~350 MB, 10–30 min) do not starve the state + # lane. Filename is the dedup key; per-request ordering + # between logs is not required. + # SEND_DEVICE_LOG queue saturation is an upstream + # terminal failure. Emit the protocol-shaped + # 2010,3,,0,1 (Disconnected) rejection so UI-Brain's + # backup-and-retry handler engages, AND CS2UI_ERROR for + # operator visibility. Both are expected (layered). + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.idempotent_network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, + url='', + payload=device_log_data, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to idempotent queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.error( + "CS2DCS_REQ_SEND_DEVICE_LOG not queued (%s): %s", + outcome, detail) + # Protocol-shaped rejection + device_file_name = os.path.basename( + device_log_data.get("path", "")) + rejection_body = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + device_file_name, + LogUploadReasonCode.DISCONNECTED.value) + self.output_channel.enqueue_message(rejection_body) + # CS2UI_ERROR for operator diagnostics (layered) + err = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + format_upload_error_body( + device_file_name, + "Queue {0}: {1}".format(outcome, detail))) + self._enqueue_error(err, message.correlation_id) + + except Exception as e: + # Best-effort filename: params[0] may be a valid path + # even if the upload failed downstream. + _fn = os.path.basename(str(file_path)) if file_path else '' + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + format_upload_error_body(_fn, str(e))) + self._enqueue_error(error, message.correlation_id) + finally: + with _in_flight_lock: + _in_flight_uploads.discard(file_path) Index: cloudsync/utils/alive.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/utils/alive.py (.../alive.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/utils/alive.py (.../alive.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -1,16 +1,16 @@ -"""Scheduler-independent CS-alive file provider (F3). +"""Scheduler-independent CS-alive file provider. A tiny dedicated daemon thread writes the current unix timestamp to a file on the SD card every ``interval`` seconds. No queue, no scheduler, no output bus — this path stays functional even when the CS output pipeline is stuck, -giving UI-Brain (and the F1 watchdog) a liveness signal that is not -susceptible to the ``network_request_handler`` or ``output_channel`` -starvation patterns observed in DIAS-43 / HD000118. +giving UI-Brain (and the watchdog) a liveness signal that is not +susceptible to ``network_request_handler`` or ``output_channel`` +starvation. The ``last_progress_ts`` attribute is updated after every successful file -write and is consumed by the F1 liveness-progress watchdog (ST-3). If the -write loop itself stalls (e.g. SD-card hang), ``last_progress_ts`` stops -advancing and F1 escalates to sentinel. This is self-monitoring by design. +write and is consumed by the liveness-progress watchdog. If the write loop +itself stalls (e.g. SD-card hang), ``last_progress_ts`` stops advancing +and the watchdog escalates to sentinel. This is self-monitoring by design. """ from threading import Thread @@ -42,15 +42,15 @@ """Write ``\\n`` to :attr:`_path` on every tick. Tolerates ``OSError``/``IOError`` so a transient SD-card failure - cannot kill the thread. Persistent failure is caught by F1 via - staleness of :attr:`last_progress_ts`. + cannot kill the thread. Persistent failure is caught by the + watchdog via staleness of :attr:`last_progress_ts`. """ while True: try: with open(self._path, "w") as f: f.write("{}\n".format(int(time()))) - # F13 — monotonic for watchdog marker; wall-clock stays in the - # file payload (UI-Brain reads it as a human-readable time). + # Monotonic clock for the watchdog marker; wall-clock stays in + # the file payload (UI-Brain reads it as a human-readable time). self.last_progress_ts = monotonic() except (OSError, IOError) as exc: if self._logger is not None: Index: cloudsync/utils/globals.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/utils/globals.py (.../globals.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/utils/globals.py (.../globals.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -62,8 +62,8 @@ CS_LOG_PATH = "/media/sd-card/cloudsync/log" CS_LOG_FILE = os.path.join(CS_LOG_PATH, "cloudsync.log") -# LIVENESS — F3: scheduler-independent progress file consumed by UI-Brain -# (and by F1 watchdog as a self-monitoring progress signal). +# LIVENESS: scheduler-independent progress file consumed by UI-Brain +# (and by the watchdog as a self-monitoring progress signal). CS_ALIVE_FILE = "/media/sd-card/cloudsync/cs_alive" # DEVICE TOKEN @@ -173,17 +173,17 @@ # TIME CONSTANTS S_MS_CONVERSION_FACTOR = 1000 -# F7 — fleet-stagger jitter (0.5.5). -# Per-device deterministic jitter in [0, JITTER_MAX_SECONDS) derived from -# the HD serial. Applied to time-scheduled CS->DCS events to break the -# synchronous fleet-wide cliff at 00:00 UTC (rotation, pre-warm, first -# reachability probe). 2 minutes is enough to flatten the DCS ingress -# spike without affecting any timing-sensitive flow at this scale. +# Fleet-stagger jitter. Per-device deterministic jitter in +# [0, JITTER_MAX_SECONDS) derived from the HD serial. Applied to +# time-scheduled CS->DCS events to break the synchronous fleet-wide +# cliff at 00:00 UTC (rotation, pre-warm, first reachability probe). +# 2 minutes is enough to flatten the DCS ingress spike without +# affecting any timing-sensitive flow at this scale. JITTER_MAX_SECONDS = 120 def get_device_jitter_seconds(serial_number): - """F7 - deterministic per-device jitter in [0, JITTER_MAX_SECONDS). + """Deterministic per-device jitter in [0, JITTER_MAX_SECONDS). Same serial always produces the same value. A falsy / missing serial (e.g. pre-registration manufacturing mode) returns 0 so the call site Index: cloudsync/utils/heartbeat.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/utils/heartbeat.py (.../heartbeat.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/utils/heartbeat.py (.../heartbeat.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -20,12 +20,12 @@ self.logger = logger self.network_request_handler = network_request_handler self.output_channel = output_channel - # F5 — opportunistic CS-log upload deps. When any is None the F5 + # Opportunistic CS-log upload deps. When any is None the upload # pathway is inert and the heartbeat behaves like the 0.5.4 baseline. self._idempotent_handler = idempotent_network_request_handler self._g_config = g_config self._cs_log_path = cs_log_path or CS_LOG_PATH - # F1 — progress marker; updated at the end of every heartbeat cycle. + # Progress marker; updated at the end of every heartbeat cycle. self.last_progress_ts = 0 self.thread = Thread(target=self.heartbeat, daemon=True) self.thread.start() @@ -36,10 +36,9 @@ """ while True: if self.send_heartbeat: - # F4 caller audit: heartbeat is drop-oldest semantics — recent - # state is more useful than a stale queued copy, so a "full" - # outcome is acceptable (just log at debug; the next tick will - # try again). + # Drop-oldest semantics: recent state is more useful than a + # stale queued copy, so a "full" outcome is acceptable + # (logged at debug; the next tick will try again). outcome, detail = helpers_add_to_output_channel( output_channel=self.output_channel, message_body=str(OutboundMessageIDs.CS2UI_REQ_DEVICE_STATE.value) + ',0', @@ -49,23 +48,23 @@ "Heartbeat CS2UI_REQ_DEVICE_STATE dropped (%s): %s", outcome, detail) - # F5 — opportunistic CS-log upload. Decouples midnight rotation + # Opportunistic CS-log upload. Decouples midnight rotation # from upload: one file per heartbeat cycle when the idempotent # lane is idle. A 100-file backlog drains in ~33 min at 20 s/tick. self._maybe_upload_pending_cs_log() - # F1 — tick the progress marker regardless of send_heartbeat: - # a running-but-suppressed heartbeat thread is still "making - # progress" for watchdog purposes. - # F13 — monotonic (clock-jump robust). + # Tick the progress marker regardless of send_heartbeat: a + # running-but-suppressed heartbeat thread is still "making + # progress" for watchdog purposes. Monotonic clock so a + # wall-clock step does not poison the marker. self.last_progress_ts = monotonic() sleep(self.HEARTBEAT_FREQ) def _maybe_upload_pending_cs_log(self): - """F5 — enqueue the oldest rotated CS log file when conditions allow. + """Enqueue the oldest rotated CS log file when conditions allow. Guards (ordered cheapest-first): - 1. F5 deps wired (idempotent handler + g_config present). + 1. Upload deps wired (idempotent handler + g_config present). 2. Idempotent lane idle (empty queue). 3. Reachability reports online. 4. A pending file exists on disk. @@ -84,12 +83,12 @@ try: serial = self._g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] except Exception as exc: - self.logger.warning("F5: serial lookup failed: %s", exc) + self.logger.warning("CS-log upload: serial lookup failed: %s", exc) return try: checksum = helpers_sha256_checksum(pending) except Exception as exc: - self.logger.warning("F5: checksum failed for %s: %s", pending, exc) + self.logger.warning("CS-log upload: checksum failed for %s: %s", pending, exc) return cs_log_data = { "path": pending, @@ -105,14 +104,14 @@ method='', g_config=self._g_config, correlation_id=correlation_id, - success_message="F5: opportunistic CS log upload queued for {0}".format(pending)) + success_message="Opportunistic CS log upload queued for {0}".format(pending)) if outcome != "queued": - self.logger.debug("F5: CS log upload deferred (%s): %s", outcome, detail) + self.logger.debug("CS log upload deferred (%s): %s", outcome, detail) def _idempotent_queue_idle(self): """True when the idempotent lane has no pending items. - Keeps opportunistic F5 uploads from stacking on top of user-triggered + Keeps opportunistic CS-log uploads from stacking on top of user-triggered device-log uploads (SEND_DEVICE_LOG). Returns False defensively on any attribute error. """ @@ -132,13 +131,13 @@ try: matches = glob.glob(pattern) except Exception as exc: - self.logger.warning("F5: glob failed for %s: %s", pattern, exc) + self.logger.warning("CS-log upload: glob failed for %s: %s", pattern, exc) return None if not matches: return None try: matches.sort(key=lambda p: os.path.getmtime(p)) except Exception as exc: - self.logger.warning("F5: mtime sort failed: %s", exc) + self.logger.warning("CS-log upload: mtime sort failed: %s", exc) return None return matches[0] Index: cloudsync/utils/helpers.py =================================================================== diff -u -rf26f09af1e10683fcbeb7fac051976ee0cd504f0 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/utils/helpers.py (.../helpers.py) (revision f26f09af1e10683fcbeb7fac051976ee0cd504f0) +++ cloudsync/utils/helpers.py (.../helpers.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -185,15 +185,14 @@ def helpers_get_stored_token() -> Union[str, None]: """Return the stored access token string, or None. - F15 (server authority) — no client-side `exp` check. DCS's JwtBearer - middleware is the sole authority on token lifetime (default 5 min - ClockSkew tolerance). On 401/403 from a DCS call, callers refresh - via cert auth and retry once. This eliminates the wall-clock - dependency that caused refresh storms on devices with operator-set - or jumped system time (DVT-9 / multi-day jump test protocol). + No client-side `exp` check. DCS's JwtBearer middleware is the sole + authority on token lifetime (default 5 min ClockSkew tolerance). On + 401/403 from a DCS call, callers refresh via cert auth and retry + once. This eliminates the wall-clock dependency that causes refresh + storms on devices with operator-set or jumped system time. - F10 corruption guard preserved: a corrupt / truncated / unreadable - token file MUST return None rather than crash CS. + Corruption guard: a corrupt / truncated / unreadable token file MUST + return None rather than crash CS. :return: The access token string if present, otherwise None. """ @@ -251,10 +250,10 @@ """ Reads the access token json file, returns it as a dict. - F10: on any read failure (corrupt JSON, IO error, invalid encoding) - return an empty dict instead of raising. An unreadable token file - must not crash CS — the caller will treat {} the same as "no cached - token" and obtain a fresh one from Keycloak. + On any read failure (corrupt JSON, IO error, invalid encoding) return + an empty dict instead of raising. An unreadable token file must not + crash CS — the caller will treat {} the same as "no cached token" and + obtain a fresh one from Keycloak. :param path: The path to the access token json file :return: parsed dict, or {} if file missing / unreadable / corrupt @@ -496,7 +495,7 @@ return None -# F4 — Three-state backpressure. +# Three-state backpressure. # These helpers USED to busy-wait 30 s and silently swallow the failure on # queue saturation. Now they return a typed outcome so every caller can # decide the right remediation (CS2UI_ERROR, 2010 rejection, retry-later…). @@ -513,16 +512,15 @@ _F4_MAX_WAIT_S_DEFAULT = 5 _F4_POLL_INTERVAL_S = 0.1 -# F1 hand-off: rolling counter of "full" events used by the watchdog to -# escalate to sentinel when queue saturation is chronic rather than -# transient. See ST-3. +# Rolling counter of "full" events used by the watchdog to escalate +# to sentinel when queue saturation is chronic rather than transient. _F4_QUEUE_FULL_EVENTS = [] _F4_QUEUE_FULL_WINDOW_S = 300 _F4_QUEUE_FULL_LOCK = threading.Lock() def _f4_record_queue_full(): - """Record a 'queue saturated' event for F1 watchdog consumption.""" + """Record a 'queue saturated' event for watchdog consumption.""" now = time() with _F4_QUEUE_FULL_LOCK: _F4_QUEUE_FULL_EVENTS.append(now) @@ -535,8 +533,8 @@ def helpers_queue_full_event_count(window_s=_F4_QUEUE_FULL_WINDOW_S): """Return the number of queue-saturation events in the last *window_s* seconds. - Consumed by the F1 watchdog (ST-3) to escalate chronic saturation to - sentinel independently of any individual thread appearing dead. + Consumed by the watchdog to escalate chronic saturation to sentinel + independently of any individual thread appearing dead. """ now = time() cutoff = now - window_s @@ -556,7 +554,8 @@ (no internet, unexpected exception). Callers MUST capture the return value and act on non-``queued`` - outcomes — see the F4 caller audit in the 0.5.5 plan. + outcomes — surface a CS2UI_ERROR (or upstream-visible signal) on + "full"/"failed" so the caller can observe and retry. """ # Reachability pre-check. Retain the same cycle model (fast ramp-up # during early boot when reachability probe hasn't fired yet) but @@ -648,27 +647,55 @@ return ("full", reason) -def helpers_sha256_checksum(data: str) -> str: +def helpers_sha256_checksum(file_path: str) -> str: """ - Returns the calculated checksum (SHA256) for input data - @param data: input data. It can be either string or file - @return:: checksum - """ + Returns the SHA256 of the file's raw bytes. - isFile = os.path.isfile(data) + Strict path-only contract. Callers that need to hash a literal + string must use ``helpers_sha256_string`` explicitly. - if isFile: - checksum = hashlib.sha256() - with open(data, "rb") as f: - # Read and update hash in blocks of 4K - for byte_block in iter(lambda: f.read(4096), b""): - checksum.update(byte_block) - else: - checksum = hashlib.sha256(data.encode()) + @param file_path: path to an existing regular file + @return:: hex-digest SHA256 of file bytes + @raises TypeError: if file_path is not a str + @raises FileNotFoundError: if file_path does not resolve to a file + (operationally distinguishes file-missing from SHA mismatch + in the upload pipeline) + """ + if not isinstance(file_path, str): + raise TypeError( + "helpers_sha256_checksum expects str, got {0}".format(type(file_path).__name__) + ) + if not os.path.isfile(file_path): + raise FileNotFoundError(file_path) + checksum = hashlib.sha256() + with open(file_path, "rb") as f: + # Read and update hash in blocks of 4K + for byte_block in iter(lambda: f.read(4096), b""): + checksum.update(byte_block) return checksum.hexdigest() +def helpers_sha256_string(data: str) -> str: + """ + Returns the SHA256 of a literal string (UTF-8-encoded). + + Companion to ``helpers_sha256_checksum``: that function is strict + path-only. Use this one for protocol-payload integrity (e.g. + treatment-report JSON) where the input is intentionally a string, + not a file path. + + @param data: the string to hash + @return:: hex-digest SHA256 of data.encode('utf-8') + @raises TypeError: if data is not a str + """ + if not isinstance(data, str): + raise TypeError( + "helpers_sha256_string expects str, got {0}".format(type(data).__name__) + ) + return hashlib.sha256(data.encode("utf-8")).hexdigest() + + def helpers_crc8(message_list): """ Returns the calculated crc from a message list @@ -817,12 +844,12 @@ """ cs_log_json = helpers_read_cs_log_template(LOG_UPLOAD_TEMPLATE_PATH) - # F11 (0.5.5): compute the checksum by streaming the file directly. - # The previous version loaded the whole file + base64-encoded it - # just to produce a byte array to hash, then discarded the array. - # On ~300 MB CS logs that peak cost 4/3 x file-size and combined - # with the same load pattern inside cmd_outgoing_upload_file_in_chunks - # drove OOM-kill events on the 2 GB i.MX8MMini device. + # Stream the checksum from disk to avoid loading the whole file + + # base64-encoding it just to produce a byte array to hash. On ~300 MB + # CS logs the previous double-buffer pattern peaked at 4/3 × file size + # and combined with the same pattern inside + # cmd_outgoing_upload_file_in_chunks drove OOM-kill events on the + # 2 GB i.MX8MMini device. checksum = helpers_sha256_checksum(cs_log_data['path']) # Get file size @@ -884,9 +911,9 @@ """ device_log_json = helpers_read_device_log_template(LOG_UPLOAD_TEMPLATE_PATH) - # F11 (0.5.5): stream the checksum from disk rather than reading the - # whole file + base64-encoding it to produce a byte array. See - # helpers_construct_cs_log_json for the rationale and incident link. + # Stream the checksum from disk rather than reading the whole file + + # base64-encoding it to produce a byte array. See + # helpers_construct_cs_log_json for the rationale. checksum = helpers_sha256_checksum(device_log_data['path']) # Get file size Index: cloudsync/utils/reachability.py =================================================================== diff -u -rdebe1b458743b43a0d523b2b30d79fb1b00c336f -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/utils/reachability.py (.../reachability.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) +++ cloudsync/utils/reachability.py (.../reachability.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -19,7 +19,7 @@ def __init__(self, logger: Logger, url_reachability, initial_delay_s: int = 0): """Initialise the reachability provider. - :param int initial_delay_s: F7 fleet-stagger delay applied once + :param int initial_delay_s: Fleet-stagger delay applied once before the first probe. Zero preserves 0.5.4 behavior; a non-zero value prevents the PROD fleet from probing the reachability URL in lock-step at boot. @@ -35,7 +35,7 @@ """ Continuously monitors the connection to REACHABILITY_URL """ - # F7 - optional first-probe stagger. Gated on > 0 so existing + # Optional first-probe stagger. Gated on > 0 so existing # tests that patch ``sleep`` to StopIteration still exercise the # loop body (unchanged 0.5.4 behavior when the provider is # constructed without a jitter). Index: cloudsync/utils/watchdog.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -r79dadd7422855193f345beced530c6b92fc49b3a --- cloudsync/utils/watchdog.py (.../watchdog.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/utils/watchdog.py (.../watchdog.py) (revision 79dadd7422855193f345beced530c6b92fc49b3a) @@ -4,10 +4,10 @@ on failure (up to a configurable limit), and creates a sentinel file to request full process restart when recovery is exhausted. -F1 — 0.5.5: extends the watchdog with liveness-progress monitoring. +Liveness-progress monitoring (in addition to ``is_alive()`` checks). ``threading.Thread.is_alive()`` returns True for threads blocked in kernel -syscalls, ``event.wait()``, or busy-waits. The HD000118-style stuck-but-alive -patterns observed in DIAS-43 go completely undetected by is_alive() alone. +syscalls, ``event.wait()``, or busy-waits. Stuck-but-alive patterns are +invisible to is_alive() alone. The extended API accepts an optional ``progress_fn`` and ``max_idle_s`` per registration. On each sweep the watchdog treats stale progress the same way @@ -20,8 +20,8 @@ ``register_sentinel_condition`` provides a second escalation path: an arbitrary zero-arg predicate checked every sweep. If it returns True, the watchdog immediately creates the sentinel (no restart attempt). Used for -chronic queue-exhaustion (F4 counter) where individual threads stay alive -but the pipeline is starved. +chronic queue-exhaustion (queue-full event counter) where individual +threads stay alive but the pipeline is starved. """ import os @@ -50,7 +50,7 @@ self._max_restarts = max_restarts self._sentinel_path = sentinel_path self._entries = {} - # F1 — sentinel-condition registry (name → predicate) + # Sentinel-condition registry (name → predicate) self._sentinel_conditions = {} self._stop_event = Event() self._thread = Thread(target=self._monitor, daemon=True) @@ -69,10 +69,10 @@ :param callable restart_fn: Zero-arg callable that creates and starts a replacement thread. Use :func:`make_restart_fn` for the common case of daemon threads backed by a bound method. - :param callable progress_fn: (F1) optional zero-arg callable returning - a unix-epoch timestamp of the thread's last semantic progress. + :param callable progress_fn: Optional zero-arg callable returning + a monotonic timestamp of the thread's last semantic progress. If omitted, only dead-thread detection is performed. - :param float max_idle_s: (F1) maximum seconds of staleness tolerated + :param float max_idle_s: Maximum seconds of staleness tolerated before the thread counts as failed. Required when ``progress_fn`` is provided. """ @@ -92,7 +92,7 @@ Used for conditions where individual threads are alive but the pipeline as a whole is starved — e.g. chronic queue saturation - (F4 ``helpers_queue_full_event_count`` > threshold). + (``helpers_queue_full_event_count`` > threshold). :param str name: Diagnostic label (appears in sentinel file + logs). :param callable condition_fn: Zero-arg callable returning a bool. @@ -132,7 +132,7 @@ def _check_threads(self): """Single sweep: check every registered thread + sentinel condition.""" - # F1: first evaluate sentinel conditions — they short-circuit thread + # First evaluate sentinel conditions — they short-circuit thread # checks. A True predicate means the pipeline is chronically # starved and restarting individual threads will not help. for name, condition_fn in self._sentinel_conditions.items(): @@ -153,7 +153,7 @@ thread = entry["get_thread"]() is_dead = thread is None or not thread.is_alive() - # F1: check progress staleness even when the thread is alive. + # Check progress staleness even when the thread is alive. is_stale = False stale_reason = "" if not is_dead and entry["progress_fn"] is not None: @@ -169,7 +169,7 @@ if last_ts == 0: is_stale = False else: - # F13 — compare in monotonic time. A wall-clock step + # Compare in monotonic time. A wall-clock step # (operator `date` command or ntpd correction) must not # trip the watchdog; only real elapsed idle time matters. age = monotonic() - last_ts @@ -179,7 +179,7 @@ % (age, entry["max_idle_s"])) if not is_dead and not is_stale: - # F1: reset failure counter on a fully-healthy read. This is + # Reset failure counter on a fully-healthy read. This is # the "3 consecutive stale reads" rule — any healthy read # wipes the slate clean and prevents restart-storm escalation # on transient slowness.