Index: cloud_sync.py =================================================================== diff -u -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloud_sync.py (.../cloud_sync.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) +++ cloud_sync.py (.../cloud_sync.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -8,14 +8,15 @@ from cloudsync.utils.heartbeat import HeartBeatProvider from cloudsync.busses.file_input_bus import FileInputBus from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler -from cloudsync.handlers.cs_mft_dcs_request_handler import NetworkRequestHandler +from cloudsync.handlers.cs_mft_dcs_request_handler import NetworkRequestHandler, IdempotentRequestHandler from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.busses.file_output_bus import FileOutputBus from cloudsync.utils.reachability import ReachabilityProvider from cloudsync.utils.globals import * from cloudsync.utils.helpers import * from cloudsync.utils.logging import LoggingConfig from cloudsync.utils.watchdog import Watchdog, make_restart_fn +from cloudsync.utils.alive import AliveProvider import hmac import os @@ -24,7 +25,7 @@ import threading -VERSION = "0.5.3" +VERSION = "0.5.5" arguments = sys.argv @@ -90,36 +91,73 @@ g_utils.logger.info(SETUP_CONSOLE_LINE) sys.exit(0) +# F7 - fleet-stagger jitter computed once at boot from the HD serial. +# Reused by ReachabilityProvider (first-probe delay) and FileOutputBus +# (F6 pre-warm window offset). A pre-registration device with no serial +# gets jitter=0, which is acceptable: registration happens once per +# device and is not a fleet-wide concern. +_device_serial = g_config[CONFIG_DEVICE].get(CONFIG_DEVICE_HD_SERIAL) +_device_jitter_s = get_device_jitter_seconds(_device_serial) +g_utils.logger.info( + "F7 fleet jitter: %ds (serial=%s)", _device_jitter_s, _device_serial) + try: - reachability_provider = ReachabilityProvider(logger=app.logger, url_reachability=g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_REACHABILITY_URL]) + reachability_provider = ReachabilityProvider( + logger=app.logger, + url_reachability=g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_REACHABILITY_URL], + initial_delay_s=_device_jitter_s) except Exception as e: g_utils.logger.error( "Reachability URL missing from config file. Using Default URL - {0}".format(DEFAULT_REACHABILITY_URL)) reachability_provider = ReachabilityProvider( - logger=app.logger, url_reachability=DEFAULT_REACHABILITY_URL) + logger=app.logger, url_reachability=DEFAULT_REACHABILITY_URL, + initial_delay_s=_device_jitter_s) try: g_utils.add_reachability_provider(reachability_provider=reachability_provider) - output_channel = FileOutputBus(logger=app.logger, max_size=100, file_channels_path=UI2CS_FILE_CHANNELS_PATH) + # F6/F7 - pre-warm thread runs inside FileOutputBus; jitter staggers + # the fleet across a 2-minute window starting at 23:55 UTC. + output_channel = FileOutputBus(logger=app.logger, max_size=100, + file_channels_path=UI2CS_FILE_CHANNELS_PATH, + jitter_seconds=_device_jitter_s) error_handler = ErrorHandler(logger=app.logger, max_size=100, output_channel=output_channel) + # F9 — split-lane architecture. + # State lane (max_size=1): strict-ordering handler for registration, + # SET_DEVICE_STATE and SEND_TREATMENT_REPORT. The single-slot queue is + # the explicit ordering invariant — do NOT change max_size on this one. network_request_handler = NetworkRequestHandler(logger=app.logger, max_size=1, output_channel=output_channel, reachability_provider=reachability_provider, error_handler=error_handler) + # Idempotent lane (max_size=16): absorbs long-running, filename-keyed + # uploads (SEND_DEVICE_LOG up to ~350 MB, SEND_CS_LOG up to ~300 MB). + # Single worker per lane; max_size is queue depth, not parallelism. + # Keeps the state lane responsive during 10–30 minute transfers. + idempotent_network_request_handler = IdempotentRequestHandler( + logger=app.logger, max_size=16, output_channel=output_channel, + reachability_provider=reachability_provider, error_handler=error_handler) message_handler = UICSMessageHandler(logger=app.logger, max_size=20, network_request_handler=network_request_handler, output_channel=output_channel, reachability_provider=reachability_provider, - error_handler=error_handler) + error_handler=error_handler, + idempotent_network_request_handler=idempotent_network_request_handler) ui_cs_bus = FileInputBus(logger=app.logger, file_channels_path=UI2CS_FILE_CHANNELS_PATH, input_channel_name="inp.buf", g_config=g_config, message_handler=message_handler) + # F5 — heartbeat gets the idempotent lane + g_config so it can drain + # rotated CS-log files one-per-tick when that lane is idle. Registration + # mode sets send_heartbeat=False below, so the F5 branch stays inert + # during registration. heartbeat_provider = HeartBeatProvider(logger=app.logger, network_request_handler=network_request_handler, - output_channel=output_channel) + output_channel=output_channel, + idempotent_network_request_handler=idempotent_network_request_handler, + g_config=g_config) - logconf.set_network_provider(network_request_handler=network_request_handler) + # F9 — CS-log uploads (SEND_CS_LOG) go through the idempotent lane. + logconf.set_network_provider(network_request_handler=idempotent_network_request_handler) logconf.set_error_provider(error_handler=error_handler) logconf.set_configuration(g_config=g_config) logconf.set_log_level(g_config[CONFIG_LOGS][CONFIG_LOGS_DEFAULT_LOG_LEVEL]) @@ -139,20 +177,56 @@ # the registration-to-operation transition. watchdog = Watchdog(logger=app.logger) if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': + # F3: scheduler-independent CS-alive file. Auto-starts on construction. + # The `last_progress_ts` attribute is consumed by F1 (below). + alive_provider = AliveProvider(logger=app.logger) + # F1 — progress-signal contracts (plan §4 F1). max_idle_s values + # are conservative; require 3 consecutive stale reads (default + # max_restarts) before escalating to sentinel, which gives each + # thread an effective 3 × check_interval grace on top of max_idle_s. + watchdog.register("alive", lambda: alive_provider.thread, + make_restart_fn(alive_provider, "thread", alive_provider._loop), + progress_fn=lambda: alive_provider.last_progress_ts, + max_idle_s=30) watchdog.register("reachability", lambda: reachability_provider.thread, make_restart_fn(reachability_provider, "thread", reachability_provider.reachability_test)) watchdog.register("output_bus", lambda: output_channel.thread, - make_restart_fn(output_channel, "thread", output_channel.scheduler)) + make_restart_fn(output_channel, "thread", output_channel.scheduler), + progress_fn=lambda: output_channel.last_progress_ts, + max_idle_s=120) watchdog.register("error_handler", lambda: error_handler.thread, make_restart_fn(error_handler, "thread", error_handler.scheduler)) watchdog.register("network_request_handler", lambda: network_request_handler.thread, - make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler)) + make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler), + progress_fn=lambda: network_request_handler.last_progress_ts, + max_idle_s=300) + # F9/F1 — idempotent lane worker: large-file uploads legitimately + # take 10–30 min, so max_idle_s is 1800 s (30 min) per plan §4 F1. + watchdog.register("idempotent_request_handler", + lambda: idempotent_network_request_handler.thread, + make_restart_fn(idempotent_network_request_handler, "thread", + idempotent_network_request_handler.scheduler), + progress_fn=lambda: idempotent_network_request_handler.last_progress_ts, + max_idle_s=1800) watchdog.register("message_handler", lambda: message_handler.thread, - make_restart_fn(message_handler, "thread", message_handler.scheduler)) + make_restart_fn(message_handler, "thread", message_handler.scheduler), + progress_fn=lambda: message_handler.last_progress_ts, + max_idle_s=120) watchdog.register("file_input_bus", lambda: ui_cs_bus.thread, - make_restart_fn(ui_cs_bus, "thread", ui_cs_bus.input_channel_handler)) + make_restart_fn(ui_cs_bus, "thread", ui_cs_bus.input_channel_handler), + progress_fn=lambda: ui_cs_bus.last_progress_ts, + max_idle_s=600) watchdog.register("heartbeat", lambda: heartbeat_provider.thread, - make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat)) + make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat), + progress_fn=lambda: heartbeat_provider.last_progress_ts, + max_idle_s=90) + # F1 — chronic queue-saturation escalation. If the F4 counter shows + # more than 3 "queue full" events in the last 5 min, individual + # threads are alive but the pipeline is starved — skip restarts and + # go straight to sentinel for a clean-slate process restart. + watchdog.register_sentinel_condition( + "queue_exhaustion", + lambda: helpers_queue_full_event_count() > 3) watchdog.start() g_utils.logger.info("Watchdog started (operation mode)") else: @@ -231,14 +305,24 @@ return {"invalidAttributes": invalid_params}, BAD_REQUEST try: - helpers_add_to_network_queue(network_request_handler=network_request_handler, - request_type=NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS, - url=request.url, - payload=payload, - method=request.method, - g_config=g_config, - success_message='CS2MFT_REQ_SET_CREDENTIALS request added to network ' - 'queue') + # F4 caller audit: manufacturing-tool credentials flow. Surface + # queue failures to the existing CS2UI_ERROR channel so the MFT + # can observe and retry. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=network_request_handler, + request_type=NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS, + url=request.url, + payload=payload, + method=request.method, + g_config=g_config, + success_message='CS2MFT_REQ_SET_CREDENTIALS request added to network ' + 'queue') + if outcome != "queued": + error = Error("{0},2,{1},Queue {2}: {3}".format( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, + outcome, detail)) + error_handler.enqueue_error(error=error) except Exception as e: error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, Index: cloudsync/busses/file_input_bus.py =================================================================== diff -u -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) +++ cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -30,6 +30,8 @@ """ self.last_input_message_id = 0 + # F1 — progress marker; updated after each inotify event is processed. + self.last_progress_ts = 0 self.logger = logger self.file_channels_path = file_channels_path @@ -49,6 +51,9 @@ The Input Channel Handler - it parses and sends upstream all messages arriving on the File Input Bus """ for event in self.i.event_gen(yield_nones=False): + # F1 — every inotify event (including filtered-out ones) counts as + # progress; it proves the thread is reading events off the kernel. + self.last_progress_ts = time() (_, type_names, path, filename) = event # self.logger.debug("PATH=[{}] FILENAME=[{}] EVENT_TYPES={}".format(path, filename, type_names)) if (('IN_MODIFY' in type_names) or ('IN_CLOSE_WRITE' in type_names)) and ( Index: cloudsync/busses/file_output_bus.py =================================================================== diff -u -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) +++ cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -1,38 +1,64 @@ """Implementation of an Output Bus based on the Linux file system""" +import datetime +import os from logging import Logger from collections import deque from threading import Event, Thread -import datetime +from time import time, sleep from cloudsync.utils import helpers from cloudsync.utils.filesystem import check_writable, check_disk_space_mb class FileOutputBus: """File Output Bus class that receives, parses and sends downstream all outbound messages""" + + # F6 — pre-warm window and cadence. The window is 5 minutes wide so + # the 0-120 s F7 jitter fits inside with a 3-minute buffer; the check + # runs once per minute which is cheap and well under the window size. + PREWARM_CHECK_INTERVAL_S = 60 + PREWARM_WINDOW_START_HOUR = 23 + PREWARM_WINDOW_START_MINUTE = 55 + def __init__(self, logger: Logger, max_size, - file_channels_path: str): + file_channels_path: str, + jitter_seconds: int = 0): """ Initialize the File Input Bus. :param Logger logger: Logger object :param max_size: the maximum size of the message queue for the Output Bus :type max_size: int :param str file_channels_path: the path where the bus files are located + :param int jitter_seconds: F7 per-device jitter in [0, 120) used by + the F6 pre-warm loop to stagger the fleet across a 2-minute + window inside the 5-minute pre-warm window. 0 means "fire at + the window start" (default; registration mode has no serial). """ self.last_output_message_id = 1 self.logger = logger self.file_channels_path = file_channels_path self.queue = deque(maxlen=max_size) + # F1 - progress marker; updated after every successful out.buf write. + self.last_progress_ts = 0 self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() + # F6 - pre-warm state. `_last_prewarm_date` is the UTC date on which + # we successfully touched tomorrow's file; prevents double-firing + # inside the 5-minute window. Failures intentionally do NOT update + # it so the loop retries next tick. + self._jitter_seconds = max(0, int(jitter_seconds)) + self._last_prewarm_date = None + self.prewarm_thread = Thread(target=self._prewarm_loop, daemon=True) + self.prewarm_thread.start() + def scheduler(self) -> None: """ Continuously monitors the event flag to check for new messages @@ -83,5 +109,58 @@ with open(self.file_channels_path + "/" + filename, "a") as f: f.write("{0}\n".format(message_body)) self.last_output_message_id += 1 + # F1 - mark progress only after a successful write (failed + # writes don't constitute forward progress). + self.last_progress_ts = time() except IOError as er: self.logger.error('Opening and/or writing to output file error: {0}'.format(str(er))) + + def _prewarm_loop(self): + """F6 - independent timer loop that calls ``_maybe_prewarm_tomorrow``. + + Any exception in the body is caught and logged; the loop must + never die silently because the scheduler thread has no way to + revive it. The watchdog does NOT monitor this thread - it's + opportunistic; a skipped pre-warm degrades gracefully to the + 0.5.4 behavior of creating the file at midnight. + """ + while True: + try: + self._maybe_prewarm_tomorrow() + except Exception as exc: + self.logger.error("F6: pre-warm loop iteration failed: %s", exc) + sleep(self.PREWARM_CHECK_INTERVAL_S) + + def _maybe_prewarm_tomorrow(self): + """F6 - touch tomorrow's out.buf in [23:55 + jitter, 00:00) UTC. + + The 5-minute window is wider than the 2-minute F7 jitter range so + every device gets a fire opportunity inside the window regardless + of thread sleep drift. Once-per-date by ``_last_prewarm_date`` + ensures idempotency; failures do NOT update the date so the next + tick retries. + """ + now = datetime.datetime.now(datetime.timezone.utc) + if now.hour != self.PREWARM_WINDOW_START_HOUR: + return + if now.minute < self.PREWARM_WINDOW_START_MINUTE: + return + today_key = now.date() + if self._last_prewarm_date == today_key: + return + # F7 jitter: delay the fire until (seconds since 23:55) >= jitter. + # Spreads the fleet fire-times across a 2-minute window starting + # at 23:55:00. + seconds_into_window = (now.minute - self.PREWARM_WINDOW_START_MINUTE) * 60 + now.second + if seconds_into_window < self._jitter_seconds: + return + tomorrow = (now + datetime.timedelta(days=1)).date() + filename = tomorrow.strftime("%Y_%m_%d_out.buf") + path = os.path.join(self.file_channels_path, filename) + try: + with open(path, "a"): + pass + self._last_prewarm_date = today_key + self.logger.debug("F6: pre-warmed %s (jitter=%ds)", path, self._jitter_seconds) + except (IOError, OSError) as exc: + self.logger.warning("F6: pre-warm failed for %s: %s", path, exc) Index: cloudsync/common/enums.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/common/enums.py (.../enums.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/common/enums.py (.../enums.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -146,3 +146,36 @@ DISCONNECTED = 1 # Backup & retry CREDENTIAL = 2 # Backup & NO upload DUPLICATE = 3 # Backup & rename + + +# F9 — Lane classification for network requests (0.5.5 ST-5). +# +# The state lane (the existing single-slot ``NetworkRequestHandler``, +# ``max_size=1``) serves registration-mode traffic, ``SET_DEVICE_STATE`` +# transitions, and ``SEND_TREATMENT_REPORT`` uploads. Strict ordering is +# preserved for every request type in that lane. +# +# The idempotent lane (``IdempotentRequestHandler``, ``max_size=16``) +# absorbs long-running, filename-keyed uploads that would otherwise +# starve the state lane during 10–30 minute transfers: +# +# - ``CS2DCS_REQ_SEND_DEVICE_LOG`` (up to ~350 MB, HD000118 root cause) +# - ``CS2DCS_REQ_SEND_CS_LOG`` (up to ~300 MB on debug days) +# +# Both lanes run a single worker each: ``max_size`` is queue depth, not +# parallelism. Parallel upload of two filename-keyed logs serialises +# through the idempotent worker while the state lane continues serving +# state transitions without delay. +# +# Absence from this map → state lane (fail-safe). Any new request type +# added in the future therefore defaults to ordered delivery until an +# explicit decision to move it has been reviewed and tested. +REQUEST_TYPE_LANE = { + NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG: "idempotent", + NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG: "idempotent", +} + + +def lane_for_request_type(request_type) -> str: + """Return the lane name for *request_type*. Unknown types → 'state'.""" + return REQUEST_TYPE_LANE.get(request_type, "state") Index: cloudsync/config/config_STAGING.json =================================================================== diff -u --- cloudsync/config/config_STAGING.json (revision 0) +++ cloudsync/config/config_STAGING.json (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -0,0 +1,29 @@ +{ + "kebormed_paas": { + "idp_client_secret": "NL2cn6eMyg2WLSB0nhfvbxvM79dvo3ta", + "url_mft": "", + "url_dcs": "https://device-api.diality.staging.kebormed.com", + "url_device_identity": "https://device-identity.diality.staging.kebormed.com/auth/realms/Main/protocol/openid-connect/token", + "url_reachability": "https://healthcheck.diality.staging.kebormed.com/", + "dia_org_id": 1 + }, + "device": { + "ip": "", + "port": 80, + "name": "", + "hd_serial": "", + "dg_serial": "", + "sw_version": "", + "mode": "registration", + "device_state": "INACTIVE_NOT_OK" + }, + "logs": { + "default_log_level": "ERROR", + "default_log_level_duration": "86400000", + "current_log_level": "", + "log_level_duration": 0, + "log_level_start_timestamp": 0, + "log_level_stop_timestamp": 0, + "update_dcs_flag": 0 + } +} \ No newline at end of file Index: cloudsync/handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u -rf6c1d0be3c250785357d92318fa0662656f0f61d -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision f6c1d0be3c250785357d92318fa0662656f0f61d) +++ cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -19,6 +19,10 @@ from cloudsync.handlers.incoming.handler_mft_to_cs import * class NetworkRequestHandler: + # F9 — single-slot ordering invariant. The state lane's max_size MUST + # remain 1 so ordering of SET_DEVICE_STATE transitions is preserved. + DEFAULT_MAX_SIZE = 1 + def __init__(self, logger: Logger, max_size, output_channel, reachability_provider, error_handler): self.logger = logger self.logconf = LoggingConfig() @@ -27,6 +31,10 @@ self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe + # F1 — semantic-progress marker updated after each request completes. + # Default 0 signals "no work yet"; the watchdog tolerates this until + # the first tick. + self.last_progress_ts = 0 self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() @@ -44,6 +52,9 @@ try: self.handle_request(req) finally: + # F1 — mark progress after each request returns, + # success or failure. + self.last_progress_ts = time() clear_correlation_id() self.event.clear() @@ -448,7 +459,10 @@ correlation_id=_cid, device_sn=_dsn) if isinstance(upload_result, dict) and not upload_result.get("accepted"): - # Rejection (e.g. 409 duplicate) — send 2010 with 3 params: filename, 0, reason_code + # F2 — protocol-shaped rejection covering every terminal + # failure (409 Duplicate reason=3, connection/timeout/5xx + # reason=1 Disconnected, 401/403 reason=2 Credential). + # UI-Brain's existing rejection handlers engage on each. message_body = "{0},3,{1},0,{2}".format( OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, upload_result["filename"], @@ -615,3 +629,33 @@ return False else: return True + + +class IdempotentRequestHandler(NetworkRequestHandler): + """F9 — idempotent lane for long-running, filename-keyed uploads. + + Structurally identical to :class:`NetworkRequestHandler` — inherits + ``scheduler`` / ``enqueue_request`` / ``handle_request``. The only + operational difference is the default queue depth (16 vs 1), which + lets large log uploads absorb backlog without starving the state + lane. + + Scope (see ``REQUEST_TYPE_LANE`` in ``cloudsync.common.enums``): + - ``CS2DCS_REQ_SEND_DEVICE_LOG`` (up to ~350 MB) + - ``CS2DCS_REQ_SEND_CS_LOG`` (up to ~300 MB) + + The class exists for type discrimination (watchdog naming, tests, + log clarity). Do not instantiate for state-ordering-critical work. + """ + + DEFAULT_MAX_SIZE = 16 + + def __init__(self, logger: Logger, max_size=None, output_channel=None, + reachability_provider=None, error_handler=None): + if max_size is None: + max_size = self.DEFAULT_MAX_SIZE + super().__init__(logger=logger, max_size=max_size, + output_channel=output_channel, + reachability_provider=reachability_provider, + error_handler=error_handler) + self.logger.info('Created Idempotent Request Handler (max_size=%d)', max_size) Index: cloudsync/handlers/logs_handler.py =================================================================== diff -u -rf6c1d0be3c250785357d92318fa0662656f0f61d -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision f6c1d0be3c250785357d92318fa0662656f0f61d) +++ cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -1,110 +1,140 @@ -from cloudsync.handlers.error import Error +"""Custom rotating log handler for CloudSync. + +F5 (0.5.5): upload is decoupled from rotation - the actual CS-log upload +is driven by ``HeartBeatProvider`` on its 20-second tick when the +idempotent lane is idle. This class's sole responsibility is rotation. + +F12 (0.5.5): clock-jump tolerant rotation. ``logging.handlers.TimedRotatingFileHandler`` +computes ``rolloverAt`` from the REAL wall-clock at boot and never +revisits that decision unless a rollover actually fires. On devices +where operators simulate day transitions by setting the system clock +to 23:59 and waiting, the stdlib rolloverAt remains anchored to the +real boot time and never matches the simulated time, so rotations are +silently skipped (confirmed against CPython 3.11). + +F12 replaces the stdlib trigger with a UTC-date-equality check: +rotation fires whenever the current UTC date differs from the date on +which the active log file was opened. Any clock jump - forward, +backward, or past multiple midnight boundaries - is honored at the +very next log emit. Collision safety: if the rotated-file suffix +already exists on disk (operator set clock backward then forward +again across the same date), a numeric counter is appended so no prior +rotated file is silently overwritten. +""" + +from logging.handlers import TimedRotatingFileHandler + from cloudsync.utils.helpers import * from cloudsync.utils.globals import * -from logging.handlers import TimedRotatingFileHandler -from time import time, sleep -from threading import Thread +# Re-import the datetime and time MODULES last, aliased. cloudsync.utils.helpers +# does ``from datetime import *`` (rebinding "datetime" to the +# ``datetime.datetime`` class) and ``from time import time, sleep`` +# (rebinding "time" to the ``time.time`` function). Star-imports above +# would therefore shadow our module references; aliasing keeps the +# module accessors unambiguous. +import datetime as _dt_module +import time as _time_module + class CustomTimedRotatingFileHandlerHandler(TimedRotatingFileHandler): """ - Handler for logging to a file, rotating the log file and uploading it to cloud at certain timed intervals. - It extends the official TimedRotatingFileHandler to add the upload functionality. + Handler for logging to a file, rotating the log file at timed intervals. + + The ``set_network_provider`` / ``set_error_provider`` / ``set_configuration`` + setters are retained as pass-through no-ops because ``LoggingConfig`` + still invokes them from ``cloud_sync.py``. """ - _upload_in_progress = False - def __init__(self, filename, prelogger: Logger, *args, **kwargs): super().__init__(filename, *args, **kwargs) self.network_request_handler = None self.error_handler = None self.g_config = None + # F12 - track the UTC date of the currently-active file. Rotation + # decisions compare this against datetime.now(utc).date() on every + # emit, so any clock jump is caught immediately regardless of the + # stdlib's boot-locked rolloverAt. + self._active_log_date = self._current_utc_date() prelogger.info("CUSTOM LOG HANDLER INITIATED") - def doRollover(self): - super().doRollover() - self.__select_files_to_upload() + @staticmethod + def _current_utc_date(): + return _dt_module.datetime.now(_dt_module.timezone.utc).date() - def set_network_provider(self, network_request_handler): - self.network_request_handler = network_request_handler + def shouldRollover(self, record): + """F12 - rotate whenever the UTC date changes, ignoring + self.rolloverAt (which is a boot-time fixed value that does not + adapt to simulated clock jumps).""" + try: + return self._current_utc_date() != self._active_log_date + except Exception: + # Defensive: any failure here must not break logging itself. + # Fall back to the stdlib check so at least real midnights + # still rotate. + try: + return super().shouldRollover(record) + except Exception: + return False - def set_error_provider(self, error_handler): - self.error_handler = error_handler + def doRollover(self): + """F12 - rotate using the current simulated time, not + self.rolloverAt. Computes the rotated-file suffix from the + UTC date of the log being rotated out (the data's original date). + Guarantees no silent overwrite of a pre-existing file with the + same suffix (collisions happen when operators set the clock + backward then forward across the same date boundary).""" + if self.stream: + self.stream.close() + self.stream = None - def set_configuration(self, g_config): - self.g_config = g_config + current_time = int(_time_module.time()) + # The active log was accumulating since self._active_log_date. + # Use that as the suffix so the rotated filename reflects the + # data's date, not the wall-clock moment of rotation. + suffix_date = self._active_log_date + suffix_str = suffix_date.strftime(self.suffix) + dfn = self.rotation_filename(self.baseFilename + "." + suffix_str) - def __select_files_to_upload(cls): - """ - Checks for existing rotated files in the directory (excluding the newly created one) and uploads them. - Spawns a background thread to upload files sequentially, waiting for each to complete - before starting the next. This prevents queue saturation when multiple backlogged - files exist (e.g. after extended offline periods). - """ - if cls._upload_in_progress: - return # previous batch still draining + # Collision safety: preserve any pre-existing rotated file with + # the same suffix (operator clock-jumped across a date that was + # already rotated earlier). + base_dfn = dfn + counter = 1 + while os.path.exists(dfn): + dfn = "{0}.{1}".format(base_dfn, counter) + counter += 1 - # We use the cls.baseFilename because according to code documentation: - # the filename passed in, could be a path object (see Issue #27493), - # but cls.baseFilename will be a string + try: + self.rotate(self.baseFilename, dfn) + except Exception: + # If rotation fails (e.g., source missing), don't leave the + # handler in a half-closed state - reopen and let the next + # emit retry. + if not self.delay: + self.stream = self._open() + raise - existing_files = [] - base_name = os.path.basename(cls.baseFilename) - for filename in os.listdir(os.path.dirname(cls.baseFilename)): - if filename.startswith(base_name) and filename != base_name: - existing_files.append(os.path.join(os.path.dirname(cls.baseFilename), filename)) + if not self.delay: + self.stream = self._open() - if len(existing_files) > 0: - existing_files.sort(key=lambda x: os.path.getmtime(x)) # oldest first - cls._upload_in_progress = True - Thread(target=cls.__upload_files_sequentially, - args=(existing_files,), daemon=True).start() + # Update the active-date marker to the NEW current UTC date. + # The very next emit compares against this and knows not to + # re-rotate until the UTC date changes again. + self._active_log_date = self._current_utc_date() - def __upload_files_sequentially(cls, files): - """ - Uploads CS log files one at a time, waiting for the network queue to drain - between each upload. This ensures all backlogged files are uploaded without - saturating the single-slot network queue. - """ + # Keep stdlib's internal bookkeeping consistent, though we do + # not rely on rolloverAt for rotation decisions. try: - for f in files: - if not os.path.exists(f): - continue # deleted by log retention or previous upload - cls.__upload_cs_log_file(f) - # Wait for network queue to drain before next file - while len(cls.network_request_handler.queue) > 0: - sleep(1) - sleep(2) # yield to bus-initiated requests - finally: - cls._upload_in_progress = False + self.rolloverAt = self.computeRollover(current_time) + except Exception: + pass - def __upload_cs_log_file(cls, log_file_path): + def set_network_provider(self, network_request_handler): + self.network_request_handler = network_request_handler - cs_log_data = { - "path": log_file_path, - "serialNumber": cls.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], - "checksum": helpers_sha256_checksum(log_file_path) - } + def set_error_provider(self, error_handler): + self.error_handler = error_handler - g_utils.logger.debug(f"CS log data: {cs_log_data}") - - try: - # Generate synthetic correlation ID for CS log uploads. - # CS logs are triggered by timer (log rotation), not bus messages, - # so there is no originating message to derive a correlation from. - serial = cls.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] - correlation_id = f"cs-{serial}-cslog-0-{str(round(time() * 1000))}" - - helpers_add_to_network_queue(network_request_handler=cls.network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG, - url='', - payload=cs_log_data, - method='', - g_config=cls.g_config, - correlation_id=correlation_id, - success_message='CS2DCS_REQ_SEND_CS_LOG request added to network queue') - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_LOG_ERROR.value, - str(e)) - cls.error_handler.enqueue_error(error=error) \ No newline at end of file + def set_configuration(self, g_config): + self.g_config = g_config Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -12,6 +12,34 @@ from cloudsync.handlers.error import Error +def _upload_rejection(filename: str, reason_code: int) -> dict: + """F2 — build a terminal-failure rejection dict for an upload call. + + Returned instead of ``None`` whenever the upload cannot be completed. + The caller (cs_mft_dcs_request_handler upload dispatch) translates this + into a protocol-shaped ``2010,3,,0,`` message + on the UI output bus so UI-Brain's existing rejection handlers + (code 1 Disconnected → backup & retry, 2 Credential → backup & no + upload, 3 Duplicate → backup & rename) engage automatically. + """ + return {"accepted": False, "filename": filename, "reason_code": reason_code} + + +def _classify_http_reason(status_code: int) -> int: + """F2 — map an HTTP status to a LogUploadReasonCode. + + 401 / 403 → CREDENTIAL (reason 2). Genuine auth failures require + operator attention; UI-Brain backs up without retry. + + Everything else (timeouts, 5xx, connection resets, DNS, TLS) → + DISCONNECTED (reason 1). Safest default; UI-Brain backs up and + retries later. + """ + if status_code in (401, 403): + return LogUploadReasonCode.CREDENTIAL.value + return LogUploadReasonCode.DISCONNECTED.value + + def _inject_tracing_headers(headers: dict, correlation_id: str = "", device_sn: str = "") -> dict: """Add cross-system tracing headers. Non-functional: unknown headers are ignored by DCS.""" if correlation_id: @@ -545,14 +573,28 @@ origins = ("cs", "device") if log_file_origin not in origins: g_utils.logger.error(f"Wrong log file origin provided.") + # Programmer-error path; no filename available — safe to return None. return None ERROR_ID = ErrorIDs.CS_DEVICE_LOG_ERROR.value if log_file_origin == 'device' else ErrorIDs.CS_LOG_ERROR.value + # F2 — extract filename early so every rejection path can surface it to + # UI-Brain via the protocol-shaped 2010 message. + try: + _filename = str(file_json['start_session']['metadata']['deviceFileName']) + except (KeyError, TypeError): + _filename = "" + # # Start upload session # + # Refresh token before start-session to prevent expiry at midnight rollover + if token_refresher is not None: + refreshed_token = token_refresher() + if refreshed_token is not None: + access_token = refreshed_token + start_session_url = base_url.rstrip("/") + "/api/device/data/start-session" start_session_payload = file_json['start_session'] start_session_payload = json.dumps(start_session_payload) @@ -577,25 +619,32 @@ g_utils.logger.info(f"Start-session response: {response.status_code}") if response.status_code == CONFLICT: - device_file_name = str(file_json['start_session']['metadata']['deviceFileName']) - g_utils.logger.info(f"File {device_file_name} rejected as duplicate at start-session (409 Conflict).") - return {"accepted": False, "filename": device_file_name, "reason_code": LogUploadReasonCode.DUPLICATE.value} + g_utils.logger.info(f"File {_filename} rejected as duplicate at start-session (409 Conflict).") + return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) if response.status_code != 200: _log_dcs_trace_id(response, correlation_id) g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") - raise Exception(f"Error while starting upload session: {response.status_code} - {response.text}") + # F2 — map auth-class status codes to CREDENTIAL; everything + # else terminal → DISCONNECTED. + reason = _classify_http_reason(response.status_code) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + f"start-session {response.status_code}") + error_handler.enqueue_error(error=error) + return _upload_rejection(_filename, reason) except Exception as e: + # Network exceptions (timeout, DNS, TLS, connection reset) are all + # Disconnected — safest default per F2. error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) error_handler.enqueue_error(error=error) - return None + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) session_id = response.json().get("sessionId") if not session_id: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, "Missing session ID in response.") error_handler.enqueue_error(error=error) - return None + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) # # Send file in chunks @@ -605,9 +654,9 @@ target_file = file_json['general']["file_path"] file_size = file_json['general']["file_size"] upload_chunk_url = base_url.rstrip("/") + "/api/device/data/chunk" - upload_chunk_payload = file_json['upload_chunk'] - upload_chunk_payload['sessionId'] = session_id - upload_chunk_payload['chunkType'] = "device-data" + upload_chunk_payload_template = file_json['upload_chunk'] + upload_chunk_payload_template['sessionId'] = session_id + upload_chunk_payload_template['chunkType'] = "device-data" chunk_number = 1 headers = { 'Authorization': BEARER_HOLDER.format(access_token), @@ -617,76 +666,99 @@ } _inject_tracing_headers(headers, correlation_id, device_sn) - with open(target_file, "rb") as f: - file_content = f.read() + # F11 (0.5.5): stream the file in raw chunks aligned to a 3-byte + # boundary and base64-encode each chunk independently. Peak RAM + # per upload: one chunk (~2 MB) instead of 7/3 x file_size (the + # old path kept the full file bytes + full base64 string resident + # simultaneously, which on a ~300 MB CS log pushed a 2 GB device + # into OOM-kill). Raw-chunk size must be a multiple of 3 so each + # chunk's base64 output concatenates to the same string as + # base64(whole file) - preserving server-side reassembly + # semantics unchanged. + raw_chunk_size = (chunk_size // 4) * 3 + if raw_chunk_size < 3: + raw_chunk_size = 3 # degenerate safeguard + # total_base64_size and num_chunks are derived arithmetically + # from file_size so we can report progress ("chunk K of N") the + # same way the pre-F11 code did, without requiring a pre-scan. + if file_size is None: + file_size = os.path.getsize(target_file) + total_base64_size = 4 * ((file_size + 2) // 3) + num_chunks = total_base64_size // chunk_size + (total_base64_size % chunk_size > 0) + if num_chunks == 0: + num_chunks = 1 # empty/tiny file still goes through once - # Encode the bytes using base64 - base64_string = base64.b64encode(file_content).decode("utf8") - - # Get the total size of the base64 string (in bytes) - total_size = len(base64_string) - - # Calculate the number of chunks - num_chunks = total_size // chunk_size + (total_size % chunk_size > 0) - all_chunks_ok = True - for i in range(num_chunks): - start_index = i * chunk_size - end_index = min(start_index + chunk_size, total_size) - chunk = base64_string[start_index:end_index] + with open(target_file, "rb") as f: + while True: + raw_block = f.read(raw_chunk_size) + if not raw_block: + break + chunk = base64.b64encode(raw_block).decode("utf8") - # Retry logic with counter and backoff time - chunk_uploaded = False - retry_count = 0 - while retry_count < retries: - try: - if type(upload_chunk_payload) is str: - upload_chunk_payload = json.loads(upload_chunk_payload) + # Retry logic with counter and backoff time. + chunk_uploaded = False + retry_count = 0 + while retry_count < retries: + try: + # Fresh dict per attempt so we don't carry the + # prior chunk's state forward across retries. + upload_chunk_payload = dict(upload_chunk_payload_template) + upload_chunk_payload['chunkNo'] = chunk_number + upload_chunk_payload['data'] = chunk + upload_chunk_body = json.dumps(upload_chunk_payload) - upload_chunk_payload['chunkNo'] = chunk_number - upload_chunk_payload['data'] = chunk - upload_chunk_payload = json.dumps(upload_chunk_payload) + g_utils.logger.debug( + f"File upload payload (upload-chunk) - chunk No {chunk_number}: " + f"bytes={len(upload_chunk_body)}") - g_utils.logger.debug(f"File upload payload (upload-chunk) - chunk No {chunk_number}: {upload_chunk_payload}") + response = requests.post(upload_chunk_url, + headers=headers, + data=upload_chunk_body, + timeout=(30, 60)) - response = requests.post(upload_chunk_url, - headers=headers, - data=upload_chunk_payload, - timeout=(30, 60)) + if response.status_code == 200: + g_utils.logger.info(f"Uploaded chunk {chunk_number} of {num_chunks}") + chunk_number += 1 + chunk_uploaded = True + break # Successful upload, break retry loop - if response.status_code == 200: - g_utils.logger.info(f"Uploaded chunk {chunk_number} of {num_chunks}") - chunk_number += 1 - chunk_uploaded = True - break # Successful upload, break retry loop + g_utils.logger.warning( + f"Chunk {chunk_number}/{num_chunks} upload failed: " + f"{response.status_code} - {response.text[:500]}") + retry_count += 1 + if retry_count < retries: + g_utils.logger.info( + f"Retrying chunk upload in 5 seconds " + f"(attempt {retry_count}/{retries})...") + sleep(5) - g_utils.logger.warning(f"Chunk {chunk_number}/{num_chunks} upload failed: {response.status_code} - {response.text[:500]}") - retry_count += 1 - if retry_count < retries: - g_utils.logger.info(f"Retrying chunk upload in 5 seconds (attempt {retry_count}/{retries})...") - sleep(5) + except Exception as e: + retry_count += 1 + g_utils.logger.error( + f"Chunk {chunk_number}/{num_chunks} exception " + f"(attempt {retry_count}/{retries}): {e}") + if retry_count < retries: + sleep(5) - except Exception as e: - retry_count += 1 - g_utils.logger.error(f"Chunk {chunk_number}/{num_chunks} exception (attempt {retry_count}/{retries}): {e}") - if retry_count < retries: - sleep(5) + if not chunk_uploaded: + g_utils.logger.error( + f"Chunk {chunk_number} failed after {retries} retries, aborting upload") + all_chunks_ok = False + break - if not chunk_uploaded: - g_utils.logger.error(f"Chunk {chunk_number} failed after {retries} retries, aborting upload") - all_chunks_ok = False - break - if not all_chunks_ok: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, f"Upload aborted: chunk {chunk_number} failed after {retries} retries") error_handler.enqueue_error(error=error) - return None + # F2 — chunk retries exhausted → Disconnected + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) error_handler.enqueue_error(error=error) - return None + # F2 — unhandled exception in chunk loop → Disconnected (safest default) + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) # # End upload session @@ -723,20 +795,23 @@ timeout=(30, 60)) if response.status_code == CONFLICT: - device_file_name = str(file_json['start_session']['metadata']['deviceFileName']) - g_utils.logger.info(f"File {device_file_name} rejected as duplicate (409 Conflict).") - return {"accepted": False, "filename": device_file_name, "reason_code": LogUploadReasonCode.DUPLICATE.value} + g_utils.logger.info(f"File {_filename} rejected as duplicate (409 Conflict).") + return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) g_utils.logger.info(f"End-session response: {response.status_code}") if response.status_code != 200: _log_dcs_trace_id(response, correlation_id) g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") - raise Exception(f"Error while ending upload session: {response.status_code} - {response.text}") + reason = _classify_http_reason(response.status_code) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + f"end-session {response.status_code}") + error_handler.enqueue_error(error=error) + return _upload_rejection(_filename, reason) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) error_handler.enqueue_error(error=error) - return None + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) g_utils.logger.info(f"File {file_json['start_session']['metadata']['deviceFileName']} uploaded.") return str(file_json['start_session']['metadata']['deviceFileName']) Index: cloudsync/handlers/ui_cs_request_handler.py =================================================================== diff -u -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) +++ cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -19,13 +19,20 @@ class UICSMessageHandler: def __init__(self, logger: Logger, max_size, network_request_handler, output_channel, reachability_provider, - error_handler): + error_handler, idempotent_network_request_handler=None): self.logger = logger self.reachability_provider = reachability_provider self.network_request_handler = network_request_handler + # F9 — idempotent lane for long-running filename-keyed uploads + # (SEND_DEVICE_LOG). Back-compat default: when omitted, fall back to + # the state lane so existing tests and older wiring still function. + self.idempotent_network_request_handler = ( + idempotent_network_request_handler or network_request_handler) self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe + # F1 — progress marker; updated after each UI-CS message is handled. + self.last_progress_ts = 0 self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() @@ -45,6 +52,8 @@ try: self.handle_message(message) finally: + # F1 — mark progress after each message is fully processed. + self.last_progress_ts = time.time() clear_correlation_id() self.event.clear() @@ -115,15 +124,23 @@ helpers_write_config(None, CONFIG_PATH, message.g_config) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) - helpers_add_to_network_queue(network_request_handler=self.network_request_handler, - request_type=NetworkRequestType.CS2MFT_REQ_REGISTRATION, - url='', - payload={}, - method='', - g_config=message.g_config, - success_message='CS2MFT_REQ_REGISTRATION request added to network ' - 'queue', - correlation_id=message.correlation_id) + # F4 caller audit: REGISTRATION is a one-time provisioning + # flow; on backpressure there is no useful retry strategy — + # surface via warning and let the manufacturing tool retry. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2MFT_REQ_REGISTRATION, + url='', + payload={}, + method='', + g_config=message.g_config, + success_message='CS2MFT_REQ_REGISTRATION request added to network ' + 'queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.warning( + "CS2MFT_REQ_REGISTRATION not queued (%s): %s", + outcome, detail) except IOError: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CONFIG_ERROR.value, @@ -160,15 +177,27 @@ try: helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) - helpers_add_to_network_queue(network_request_handler=self.network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE, - url='', - payload={}, - method='', - g_config=message.g_config, - success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network ' - 'queue', - correlation_id=message.correlation_id) + # F4 caller audit: SET_DEVICE_STATE is state-machine critical + # (state lane). A dropped state transition creates the + # "Active Not Ready" cloud-state blackout — report loss to UI. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE, + url='', + payload={}, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network ' + 'queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.error( + "CS2DCS_REQ_SET_DEVICE_STATE not queued (%s): %s", + outcome, detail) + err = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + "Queue {0}: {1}".format(outcome, detail)) + self._enqueue_error(err, message.correlation_id) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, @@ -194,15 +223,27 @@ g_utils.logger.debug("Treatment log {0}".format(treatment_log_json)) - helpers_add_to_network_queue(network_request_handler=self.network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT, - url='', - payload=treatment_log_json, - method='', - g_config=message.g_config, - success_message='CS2DCS_REQ_SEND_TREATMENT_REPORT request added to network' - 'queue', - correlation_id=message.correlation_id) + # F4 caller audit: SEND_TREATMENT_REPORT — treatment-report + # loss is user-visible; surface a CS2UI_ERROR so UI-Brain + # can prompt a retry / mark the report as not synced. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT, + url='', + payload=treatment_log_json, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_TREATMENT_REPORT request added to network' + 'queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.error( + "CS2DCS_REQ_SEND_TREATMENT_REPORT not queued (%s): %s", + outcome, detail) + err = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + "Queue {0}: {1}".format(outcome, detail)) + self._enqueue_error(err, message.correlation_id) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, @@ -308,14 +349,41 @@ g_utils.logger.debug("Device log data {0}".format(device_log_data)) - helpers_add_to_network_queue(network_request_handler=self.network_request_handler, - request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, - url='', - payload=device_log_data, - method='', - g_config=message.g_config, - success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to network queue', - correlation_id=message.correlation_id) + # F9 — route to the idempotent lane so large log uploads + # (up to ~350 MB, 10–30 min) do not starve the state + # lane. Filename is the dedup key; per-request ordering + # between logs is not required. + # F4 caller audit + F2: SEND_DEVICE_LOG queue saturation is + # an upstream terminal failure. Emit the protocol-shaped + # 2010,3,,0,1 (Disconnected) rejection so UI-Brain's + # backup-and-retry handler engages, AND CS2UI_ERROR for + # operator visibility. Both are expected (layered). + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self.idempotent_network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, + url='', + payload=device_log_data, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to idempotent queue', + correlation_id=message.correlation_id) + if outcome != "queued": + self.logger.error( + "CS2DCS_REQ_SEND_DEVICE_LOG not queued (%s): %s", + outcome, detail) + # F2 — protocol-shaped rejection + device_file_name = os.path.basename( + device_log_data.get("path", "")) + rejection_body = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + device_file_name, + LogUploadReasonCode.DISCONNECTED.value) + self.output_channel.enqueue_message(rejection_body) + # CS2UI_ERROR for operator diagnostics (layered) + err = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + "Queue {0}: {1}".format(outcome, detail)) + self._enqueue_error(err, message.correlation_id) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, Index: cloudsync/utils/alive.py =================================================================== diff -u --- cloudsync/utils/alive.py (revision 0) +++ cloudsync/utils/alive.py (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -0,0 +1,57 @@ +"""Scheduler-independent CS-alive file provider (F3). + +A tiny dedicated daemon thread writes the current unix timestamp to a file +on the SD card every ``interval`` seconds. No queue, no scheduler, no output +bus — this path stays functional even when the CS output pipeline is stuck, +giving UI-Brain (and the F1 watchdog) a liveness signal that is not +susceptible to the ``network_request_handler`` or ``output_channel`` +starvation patterns observed in DIAS-43 / HD000118. + +The ``last_progress_ts`` attribute is updated after every successful file +write and is consumed by the F1 liveness-progress watchdog (ST-3). If the +write loop itself stalls (e.g. SD-card hang), ``last_progress_ts`` stops +advancing and F1 escalates to sentinel. This is self-monitoring by design. +""" + +from threading import Thread +from time import sleep, time + +from cloudsync.utils.globals import CS_ALIVE_FILE + + +class AliveProvider: + """Writes a unix-timestamp heartbeat to ``CS_ALIVE_FILE`` on a dedicated thread.""" + + DEFAULT_INTERVAL_S = 5 + + def __init__(self, logger, path=CS_ALIVE_FILE, interval=DEFAULT_INTERVAL_S): + """Create the provider and start its background thread immediately. + + :param logger: Logger for transient write failures. + :param str path: Destination file (default :data:`CS_ALIVE_FILE`). + :param int interval: Seconds between writes (default 5). + """ + self._logger = logger + self._path = path + self._interval = interval + self.last_progress_ts = 0 + self.thread = Thread(target=self._loop, daemon=True) + self.thread.start() + + def _loop(self): + """Write ``\\n`` to :attr:`_path` on every tick. + + Tolerates ``OSError``/``IOError`` so a transient SD-card failure + cannot kill the thread. Persistent failure is caught by F1 via + staleness of :attr:`last_progress_ts`. + """ + while True: + try: + with open(self._path, "w") as f: + f.write("{}\n".format(int(time()))) + self.last_progress_ts = time() + except (OSError, IOError) as exc: + if self._logger is not None: + self._logger.warning( + "AliveProvider write to %s failed: %s", self._path, exc) + sleep(self._interval) Index: cloudsync/utils/globals.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/utils/globals.py (.../globals.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/utils/globals.py (.../globals.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -1,4 +1,5 @@ """Object holding all application global constants""" +import hashlib import os # PATHS @@ -61,6 +62,10 @@ CS_LOG_PATH = "/media/sd-card/cloudsync/log" CS_LOG_FILE = os.path.join(CS_LOG_PATH, "cloudsync.log") +# LIVENESS — F3: scheduler-independent progress file consumed by UI-Brain +# (and by F1 watchdog as a self-monitoring progress signal). +CS_ALIVE_FILE = "/media/sd-card/cloudsync/cs_alive" + # DEVICE TOKEN TOKEN_CACHING_PATH = "/var/configurations/CloudSync/jwt/" DEVICE_KEBORMED_ACCESS_TOKEN_PATH = os.path.join(TOKEN_CACHING_PATH, "access_token.json") @@ -167,3 +172,25 @@ # TIME CONSTANTS S_MS_CONVERSION_FACTOR = 1000 + +# F7 — fleet-stagger jitter (0.5.5). +# Per-device deterministic jitter in [0, JITTER_MAX_SECONDS) derived from +# the HD serial. Applied to time-scheduled CS->DCS events to break the +# synchronous fleet-wide cliff at 00:00 UTC (rotation, pre-warm, first +# reachability probe). 2 minutes is enough to flatten the DCS ingress +# spike without affecting any timing-sensitive flow at this scale. +JITTER_MAX_SECONDS = 120 + + +def get_device_jitter_seconds(serial_number): + """F7 - deterministic per-device jitter in [0, JITTER_MAX_SECONDS). + + Same serial always produces the same value. A falsy / missing serial + (e.g. pre-registration manufacturing mode) returns 0 so the call site + never crashes; registration is not a fleet-wide concern so no stagger + is needed in that path. + """ + if not serial_number: + return 0 + digest = hashlib.sha256(str(serial_number).encode("utf-8")).hexdigest() + return int(digest, 16) % JITTER_MAX_SECONDS Index: cloudsync/utils/heartbeat.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/utils/heartbeat.py (.../heartbeat.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/utils/heartbeat.py (.../heartbeat.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -1,20 +1,32 @@ """Implementation of heartbeat functionality""" +import glob + from logging import Logger from threading import Thread -from time import sleep +from time import sleep, time from cloudsync.utils.helpers import * from cloudsync.common.enums import * + class HeartBeatProvider: HEARTBEAT_FREQ = 20 send_heartbeat = False - def __init__(self, logger: Logger, network_request_handler, output_channel): + def __init__(self, logger: Logger, network_request_handler, output_channel, + idempotent_network_request_handler=None, g_config=None, + cs_log_path=None): self.logger = logger self.network_request_handler = network_request_handler self.output_channel = output_channel + # F5 — opportunistic CS-log upload deps. When any is None the F5 + # pathway is inert and the heartbeat behaves like the 0.5.4 baseline. + self._idempotent_handler = idempotent_network_request_handler + self._g_config = g_config + self._cs_log_path = cs_log_path or CS_LOG_PATH + # F1 — progress marker; updated at the end of every heartbeat cycle. + self.last_progress_ts = 0 self.thread = Thread(target=self.heartbeat, daemon=True) self.thread.start() @@ -24,9 +36,108 @@ """ while True: if self.send_heartbeat: - # requesting device state from UI - helpers_add_to_output_channel(output_channel=self.output_channel, - message_body=str(OutboundMessageIDs.CS2UI_REQ_DEVICE_STATE.value) + ',0', - success_message="CS2UI_REQ_DEVICE_STATE message added to output channel") + # F4 caller audit: heartbeat is drop-oldest semantics — recent + # state is more useful than a stale queued copy, so a "full" + # outcome is acceptable (just log at debug; the next tick will + # try again). + outcome, detail = helpers_add_to_output_channel( + output_channel=self.output_channel, + message_body=str(OutboundMessageIDs.CS2UI_REQ_DEVICE_STATE.value) + ',0', + success_message="CS2UI_REQ_DEVICE_STATE message added to output channel") + if outcome != "queued": + self.logger.debug( + "Heartbeat CS2UI_REQ_DEVICE_STATE dropped (%s): %s", + outcome, detail) + # F5 — opportunistic CS-log upload. Decouples midnight rotation + # from upload: one file per heartbeat cycle when the idempotent + # lane is idle. A 100-file backlog drains in ~33 min at 20 s/tick. + self._maybe_upload_pending_cs_log() + + # F1 — tick the progress marker regardless of send_heartbeat: + # a running-but-suppressed heartbeat thread is still "making + # progress" for watchdog purposes. + self.last_progress_ts = time() sleep(self.HEARTBEAT_FREQ) + + def _maybe_upload_pending_cs_log(self): + """F5 — enqueue the oldest rotated CS log file when conditions allow. + + Guards (ordered cheapest-first): + 1. F5 deps wired (idempotent handler + g_config present). + 2. Idempotent lane idle (empty queue). + 3. Reachability reports online. + 4. A pending file exists on disk. + + Any guard-miss is a silent no-op — the next tick retries. + """ + if self._idempotent_handler is None or self._g_config is None: + return + if not self._idempotent_queue_idle(): + return + if not g_utils.reachability_provider.reachability: + return + pending = self._get_oldest_pending_cs_log() + if pending is None: + return + try: + serial = self._g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + except Exception as exc: + self.logger.warning("F5: serial lookup failed: %s", exc) + return + try: + checksum = helpers_sha256_checksum(pending) + except Exception as exc: + self.logger.warning("F5: checksum failed for %s: %s", pending, exc) + return + cs_log_data = { + "path": pending, + "serialNumber": serial, + "checksum": checksum, + } + correlation_id = "cs-{0}-cslog-0-{1}".format(serial, str(round(time() * 1000))) + outcome, detail = helpers_add_to_network_queue( + network_request_handler=self._idempotent_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG, + url='', + payload=cs_log_data, + method='', + g_config=self._g_config, + correlation_id=correlation_id, + success_message="F5: opportunistic CS log upload queued for {0}".format(pending)) + if outcome != "queued": + self.logger.debug("F5: CS log upload deferred (%s): %s", outcome, detail) + + def _idempotent_queue_idle(self): + """True when the idempotent lane has no pending items. + + Keeps opportunistic F5 uploads from stacking on top of user-triggered + device-log uploads (SEND_DEVICE_LOG). Returns False defensively on + any attribute error. + """ + try: + return len(self._idempotent_handler.queue) == 0 + except Exception: + return False + + def _get_oldest_pending_cs_log(self): + """Return the oldest rotated CS-log filename, or None. + + Sort by mtime rather than filename because the rotation suffix is + ``%m-%d-%Y`` (MM-DD-YYYY), which does NOT sort chronologically across + months or years alphabetically. mtime is authoritative. + """ + pattern = glob.escape(self._cs_log_path).rstrip('/') + '/cloudsync.log.*' + try: + matches = glob.glob(pattern) + except Exception as exc: + self.logger.warning("F5: glob failed for %s: %s", pattern, exc) + return None + if not matches: + return None + try: + matches.sort(key=lambda p: os.path.getmtime(p)) + except Exception as exc: + self.logger.warning("F5: mtime sort failed: %s", exc) + return None + return matches[0] Index: cloudsync/utils/helpers.py =================================================================== diff -u -ra923c3f28c864ca51557deeabebcaa576147ae4c -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/utils/helpers.py (.../helpers.py) (revision a923c3f28c864ca51557deeabebcaa576147ae4c) +++ cloudsync/utils/helpers.py (.../helpers.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -10,6 +10,7 @@ import uuid import subprocess import math +import threading from datetime import * from time import time, sleep @@ -183,24 +184,58 @@ def helpers_get_stored_token() -> Union[str, None]: """ - Returns the stored token - :return: The token if found, otherwise returns None + Returns the stored token if it exists and has not expired. + Decodes the JWT payload to check the 'exp' claim against the current time. + Returns None if the token is expired or within 60 seconds of expiry, + forcing the caller to request a fresh token. + + F10: a corrupt / truncated / unreadable token file MUST return None + rather than crash CS — token corruption is a recoverable state; CS + must stay alive to refresh the token. + + :return: The token if found and valid, otherwise returns None """ data = None if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): return None - with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: - try: + try: + with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: data = json.load(f) - except json.decoder.JSONDecodeError: - return None + except (json.JSONDecodeError, IOError, OSError, UnicodeDecodeError) as e: + if g_utils.logger is not None: + g_utils.logger.warning( + "Stored access-token file is corrupt or unreadable; forcing refresh: %s", e) + return None if data is None: return None - return data.get("access_token", None) + access_token = data.get("access_token", None) + if access_token is None: + return None + # Check JWT expiry before returning the token + try: + # JWT is header.payload.signature — decode the payload (no verification needed, + # we just need the exp claim; signature is verified by DCS on each request) + payload_b64 = access_token.split('.')[1] + # Add padding if needed + padding = 4 - len(payload_b64) % 4 + if padding != 4: + payload_b64 += '=' * padding + payload = json.loads(base64.urlsafe_b64decode(payload_b64)) + exp = payload.get('exp', 0) + now = int(time()) + if now >= (exp - 60): # expired or within 60s of expiry + g_utils.logger.info(f"Stored token expired or near expiry (exp={exp}, now={now}), will refresh") + return None + except Exception: + # If we can't decode, let the caller proceed and DCS will reject if invalid + pass + return access_token + + def helpers_read_config(path: str) -> dict: """ Read the configuration @@ -236,14 +271,27 @@ def helpers_read_access_token(path: str) -> dict: """ - Reads the access token json file, returns it as a dict + Reads the access token json file, returns it as a dict. + + F10: on any read failure (corrupt JSON, IO error, invalid encoding) + return an empty dict instead of raising. An unreadable token file + must not crash CS — the caller will treat {} the same as "no cached + token" and obtain a fresh one from Keycloak. + :param path: The path to the access token json file - :return: + :return: parsed dict, or {} if file missing / unreadable / corrupt """ data = {} - if os.path.exists(path): + if not os.path.exists(path): + return data + try: with open(path, 'r', encoding="utf-8-sig") as f: data = json.load(f) + except (json.JSONDecodeError, IOError, OSError, UnicodeDecodeError) as e: + if g_utils.logger is not None: + g_utils.logger.warning( + "Access-token file at %s is corrupt or unreadable: %s", path, e) + return {} return data @@ -470,57 +518,158 @@ return None +# F4 — Three-state backpressure. +# These helpers USED to busy-wait 30 s and silently swallow the failure on +# queue saturation. Now they return a typed outcome so every caller can +# decide the right remediation (CS2UI_ERROR, 2010 rejection, retry-later…). +# +# Outcome contract: +# ("queued", None) — successfully enqueued +# ("full", "") — queue saturated after max_wait_s +# ("failed", "") — pre-condition failed (no network, +# unexpected exception, etc.) +# +# Default max_wait_s is 5 s (was 30 s implicitly via 300 × 0.1 s). The +# handler thread is released sooner so upstream back-pressure propagates. + +_F4_MAX_WAIT_S_DEFAULT = 5 +_F4_POLL_INTERVAL_S = 0.1 + +# F1 hand-off: rolling counter of "full" events used by the watchdog to +# escalate to sentinel when queue saturation is chronic rather than +# transient. See ST-3. +_F4_QUEUE_FULL_EVENTS = [] +_F4_QUEUE_FULL_WINDOW_S = 300 +_F4_QUEUE_FULL_LOCK = threading.Lock() + + +def _f4_record_queue_full(): + """Record a 'queue saturated' event for F1 watchdog consumption.""" + now = time() + with _F4_QUEUE_FULL_LOCK: + _F4_QUEUE_FULL_EVENTS.append(now) + # Trim events older than the window. + cutoff = now - _F4_QUEUE_FULL_WINDOW_S + while _F4_QUEUE_FULL_EVENTS and _F4_QUEUE_FULL_EVENTS[0] < cutoff: + _F4_QUEUE_FULL_EVENTS.pop(0) + + +def helpers_queue_full_event_count(window_s=_F4_QUEUE_FULL_WINDOW_S): + """Return the number of queue-saturation events in the last *window_s* seconds. + + Consumed by the F1 watchdog (ST-3) to escalate chronic saturation to + sentinel independently of any individual thread appearing dead. + """ + now = time() + cutoff = now - window_s + with _F4_QUEUE_FULL_LOCK: + while _F4_QUEUE_FULL_EVENTS and _F4_QUEUE_FULL_EVENTS[0] < cutoff: + _F4_QUEUE_FULL_EVENTS.pop(0) + return len(_F4_QUEUE_FULL_EVENTS) + + def helpers_add_to_network_queue(network_request_handler, request_type, url, payload, method, g_config, - success_message, correlation_id=""): + success_message, correlation_id="", max_wait_s=_F4_MAX_WAIT_S_DEFAULT): + """Enqueue a network request with explicit backpressure reporting. + + :return: ``("queued", None)`` on success, ``("full", reason)`` when + the queue is saturated for the full ``max_wait_s`` window, or + ``("failed", reason)`` when a pre-condition blocks enqueue + (no internet, unexpected exception). + + Callers MUST capture the return value and act on non-``queued`` + outcomes — see the F4 caller audit in the 0.5.5 plan. + """ + # Reachability pre-check. Retain the same cycle model (fast ramp-up + # during early boot when reachability probe hasn't fired yet) but + # surface the definitive "no internet" outcome to the caller. cycle_duration = REACHABILITY_CYCLE_PAUSE total_cycles = REACHABILITY_CYCLES - cycle = 0 - while (not g_utils.reachability_provider.reachability) and (cycle < total_cycles): sleep(cycle_duration) cycle += 1 - if g_utils.reachability_provider.reachability: + if not g_utils.reachability_provider.reachability: + reason = "Internet DOWN: Network request {0} couldn't be processed".format(request_type.name) + g_utils.logger.warning(reason) + return ("failed", reason) + + try: r = NetworkRequest(request_type=request_type, url=url, payload=payload, method=method, g_config=g_config, correlation_id=correlation_id) - request_added_to_queue = False - max_attempts = 300 - attempt = 0 - while not request_added_to_queue and attempt < max_attempts: + except Exception as exc: + reason = "NetworkRequest construction failed for {0}: {1}".format( + request_type.name, exc) + g_utils.logger.error(reason) + return ("failed", reason) + + max_attempts = max(1, int(max_wait_s / _F4_POLL_INTERVAL_S)) + attempt = 0 + request_added_to_queue = False + while not request_added_to_queue and attempt < max_attempts: + try: request_added_to_queue = network_request_handler.enqueue_request(r) - if not request_added_to_queue: - sleep(0.1) - attempt += 1 + except Exception as exc: + reason = "enqueue_request raised for {0}: {1}".format( + request_type.name, exc) + g_utils.logger.error(reason) + return ("failed", reason) + if not request_added_to_queue: + sleep(_F4_POLL_INTERVAL_S) + attempt += 1 - if request_added_to_queue: - g_utils.logger.info(success_message) - else: - g_utils.logger.error("Failed to enqueue network request after {0} attempts: {1}".format(max_attempts, request_type.name)) - else: - g_utils.logger.warning("Internet DOWN: Network request {0} couldn't be processed".format(request_type.name)) + if request_added_to_queue: + g_utils.logger.info(success_message) + return ("queued", None) + reason = "Network queue saturated for {0}s; request {1} not accepted".format( + max_wait_s, request_type.name) + g_utils.logger.error(reason) + _f4_record_queue_full() + return ("full", reason) -def helpers_add_to_output_channel(output_channel, message_body, success_message): - message_added_to_queue = False - max_attempts = 300 + +def helpers_add_to_output_channel(output_channel, message_body, success_message, + max_wait_s=_F4_MAX_WAIT_S_DEFAULT): + """Enqueue an outbound UI-bus message with explicit backpressure reporting. + + :return: ``("queued", None)`` on success, ``("full", reason)`` when + the bus is saturated, or ``("failed", reason)`` on exception. + + Callers MUST capture the return value. For heartbeat/liveness + messages, ``"full"`` is typically acceptable (drop-oldest semantics + — recent state is more useful than a stale queued copy). For + treatment / error messages, callers SHOULD surface the failure. + """ + max_attempts = max(1, int(max_wait_s / _F4_POLL_INTERVAL_S)) attempt = 0 - while not message_added_to_queue and attempt < max_attempts: - message_added_to_queue = output_channel.enqueue_message(message_body) - if not message_added_to_queue: - sleep(0.1) + message_added = False + while not message_added and attempt < max_attempts: + try: + message_added = output_channel.enqueue_message(message_body) + except Exception as exc: + reason = "output_channel.enqueue_message raised: {0}".format(exc) + g_utils.logger.error(reason) + return ("failed", reason) + if not message_added: + sleep(_F4_POLL_INTERVAL_S) attempt += 1 - if message_added_to_queue: + if message_added: g_utils.logger.info(success_message) - else: - g_utils.logger.error("Failed to enqueue output message after {0} attempts".format(max_attempts)) + return ("queued", None) + reason = "Output channel saturated for {0}s; message not accepted".format(max_wait_s) + g_utils.logger.error(reason) + _f4_record_queue_full() + return ("full", reason) + def helpers_sha256_checksum(data: str) -> str: """ Returns the calculated checksum (SHA256) for input data @@ -690,9 +839,13 @@ """ cs_log_json = helpers_read_cs_log_template(LOG_UPLOAD_TEMPLATE_PATH) - # Convert the file into byte array - logs_byte_array = helpers_file_to_byte_array(cs_log_data['path']) - checksum = helpers_sha256_checksum(logs_byte_array) + # F11 (0.5.5): compute the checksum by streaming the file directly. + # The previous version loaded the whole file + base64-encoded it + # just to produce a byte array to hash, then discarded the array. + # On ~300 MB CS logs that peak cost 4/3 x file-size and combined + # with the same load pattern inside cmd_outgoing_upload_file_in_chunks + # drove OOM-kill events on the 2 GB i.MX8MMini device. + checksum = helpers_sha256_checksum(cs_log_data['path']) # Get file size file_size = helpers_get_file_size(cs_log_data['path']) @@ -753,9 +906,10 @@ """ device_log_json = helpers_read_device_log_template(LOG_UPLOAD_TEMPLATE_PATH) - # Convert the file into byte array - logs_byte_array = helpers_file_to_byte_array(device_log_data['path']) - checksum = helpers_sha256_checksum(logs_byte_array) + # F11 (0.5.5): stream the checksum from disk rather than reading the + # whole file + base64-encoding it to produce a byte array. See + # helpers_construct_cs_log_json for the rationale and incident link. + checksum = helpers_sha256_checksum(device_log_data['path']) # Get file size file_size = helpers_get_file_size(device_log_data['path']) Index: cloudsync/utils/reachability.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/utils/reachability.py (.../reachability.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/utils/reachability.py (.../reachability.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -16,17 +16,31 @@ class ReachabilityProvider: - def __init__(self, logger: Logger, url_reachability): + def __init__(self, logger: Logger, url_reachability, initial_delay_s: int = 0): + """Initialise the reachability provider. + + :param int initial_delay_s: F7 fleet-stagger delay applied once + before the first probe. Zero preserves 0.5.4 behavior; a + non-zero value prevents the PROD fleet from probing the + reachability URL in lock-step at boot. + """ self.logger = logger self.url_reachability = url_reachability self.reachability = False + self._initial_delay_s = max(0, int(initial_delay_s)) self.thread = Thread(target=self.reachability_test, daemon=True) self.thread.start() def reachability_test(self): """ Continuously monitors the connection to REACHABILITY_URL """ + # F7 - optional first-probe stagger. Gated on > 0 so existing + # tests that patch ``sleep`` to StopIteration still exercise the + # loop body (unchanged 0.5.4 behavior when the provider is + # constructed without a jitter). + if self._initial_delay_s > 0: + sleep(self._initial_delay_s) while True: headers = { 'User-Agent': USER_AGENT, Index: cloudsync/utils/watchdog.py =================================================================== diff -u -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloudsync/utils/watchdog.py (.../watchdog.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) +++ cloudsync/utils/watchdog.py (.../watchdog.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -3,12 +3,31 @@ Monitors registered threads via periodic is_alive() checks, attempts restart on failure (up to a configurable limit), and creates a sentinel file to request full process restart when recovery is exhausted. + +F1 — 0.5.5: extends the watchdog with liveness-progress monitoring. +``threading.Thread.is_alive()`` returns True for threads blocked in kernel +syscalls, ``event.wait()``, or busy-waits. The HD000118-style stuck-but-alive +patterns observed in DIAS-43 go completely undetected by is_alive() alone. + +The extended API accepts an optional ``progress_fn`` and ``max_idle_s`` per +registration. On each sweep the watchdog treats stale progress the same way +it treats a dead thread: increment the failure counter, attempt restart, and +escalate to sentinel after ``max_restarts``. Require 3 consecutive stale +reads before escalating (the counter resets when the thread becomes live + +fresh again) — this prevents transient-slowness false positives from +creating unnecessary restart storms. + +``register_sentinel_condition`` provides a second escalation path: an +arbitrary zero-arg predicate checked every sweep. If it returns True, the +watchdog immediately creates the sentinel (no restart attempt). Used for +chronic queue-exhaustion (F4 counter) where individual threads stay alive +but the pipeline is starved. """ import os from logging import Logger from threading import Event, Thread -from time import sleep +from time import sleep, time # Default sentinel file path — cs.py monitors this to trigger process restart @@ -31,29 +50,56 @@ self._max_restarts = max_restarts self._sentinel_path = sentinel_path self._entries = {} + # F1 — sentinel-condition registry (name → predicate) + self._sentinel_conditions = {} self._stop_event = Event() self._thread = Thread(target=self._monitor, daemon=True) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ - def register(self, name, get_thread, restart_fn): + def register(self, name, get_thread, restart_fn, + progress_fn=None, max_idle_s=None): """Register a thread for monitoring. :param str name: Human-readable name (e.g. "heartbeat"). :param callable get_thread: Zero-arg callable returning the current Thread object (e.g. ``lambda: obj.thread``). :param callable restart_fn: Zero-arg callable that creates and starts - a replacement thread. Use :func:`make_restart_fn` for the common + a replacement thread. Use :func:`make_restart_fn` for the common case of daemon threads backed by a bound method. + :param callable progress_fn: (F1) optional zero-arg callable returning + a unix-epoch timestamp of the thread's last semantic progress. + If omitted, only dead-thread detection is performed. + :param float max_idle_s: (F1) maximum seconds of staleness tolerated + before the thread counts as failed. Required when + ``progress_fn`` is provided. """ + if progress_fn is not None and max_idle_s is None: + raise ValueError( + "progress_fn requires max_idle_s for '{0}'".format(name)) self._entries[name] = { "get_thread": get_thread, "restart_fn": restart_fn, + "progress_fn": progress_fn, + "max_idle_s": max_idle_s, "failures": 0, } + def register_sentinel_condition(self, name, condition_fn): + """Register a zero-arg predicate that, when True, triggers sentinel. + + Used for conditions where individual threads are alive but the + pipeline as a whole is starved — e.g. chronic queue saturation + (F4 ``helpers_queue_full_event_count`` > threshold). + + :param str name: Diagnostic label (appears in sentinel file + logs). + :param callable condition_fn: Zero-arg callable returning a bool. + True → immediate sentinel (no restart attempt). + """ + self._sentinel_conditions[name] = condition_fn + def start(self): """Start the watchdog monitoring loop.""" self._thread.start() @@ -85,20 +131,71 @@ self._stop_event.wait(self._check_interval) def _check_threads(self): - """Single sweep: check every registered thread.""" + """Single sweep: check every registered thread + sentinel condition.""" + # F1: first evaluate sentinel conditions — they short-circuit thread + # checks. A True predicate means the pipeline is chronically + # starved and restarting individual threads will not help. + for name, condition_fn in self._sentinel_conditions.items(): + try: + if condition_fn(): + self._logger.error( + "Watchdog: sentinel condition '%s' fired — " + "escalating directly to sentinel", name) + self._create_sentinel("sentinel_condition:" + name) + return + except Exception as exc: # condition_fn misbehaved; log + skip + self._logger.error( + "Watchdog: sentinel condition '%s' raised: %s", + name, exc) + all_healthy = True for name, entry in self._entries.items(): thread = entry["get_thread"]() - if thread is not None and thread.is_alive(): + is_dead = thread is None or not thread.is_alive() + + # F1: check progress staleness even when the thread is alive. + is_stale = False + stale_reason = "" + if not is_dead and entry["progress_fn"] is not None: + try: + last_ts = entry["progress_fn"]() + except Exception as exc: + self._logger.error( + "Watchdog: progress_fn for '%s' raised: %s", name, exc) + last_ts = 0 + # last_ts==0 means "no progress recorded yet" — tolerate until + # the thread has had a chance to tick at least once. Use a + # grace equal to max_idle_s from watchdog start. + if last_ts == 0: + is_stale = False + else: + age = time() - last_ts + if age > entry["max_idle_s"]: + is_stale = True + stale_reason = ("progress stale %.1fs > max_idle_s %s" + % (age, entry["max_idle_s"])) + + if not is_dead and not is_stale: + # F1: reset failure counter on a fully-healthy read. This is + # the "3 consecutive stale reads" rule — any healthy read + # wipes the slate clean and prevents restart-storm escalation + # on transient slowness. + if entry["failures"] > 0: + self._logger.info( + "Watchdog: thread '%s' recovered; " + "resetting failure counter (was %d)", + name, entry["failures"]) + entry["failures"] = 0 continue all_healthy = False entry["failures"] += 1 failures = entry["failures"] + reason = "dead" if is_dead else stale_reason self._logger.error( - "Watchdog: thread '%s' is dead (failure %d/%d)", - name, failures, self._max_restarts, + "Watchdog: thread '%s' is unhealthy (%s) (failure %d/%d)", + name, reason, failures, self._max_restarts, ) if failures <= self._max_restarts: