Index: cloud_sync.py =================================================================== diff -u -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f -rdebe1b458743b43a0d523b2b30d79fb1b00c336f --- cloud_sync.py (.../cloud_sync.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) +++ cloud_sync.py (.../cloud_sync.py) (revision debe1b458743b43a0d523b2b30d79fb1b00c336f) @@ -8,14 +8,15 @@ from cloudsync.utils.heartbeat import HeartBeatProvider from cloudsync.busses.file_input_bus import FileInputBus from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler -from cloudsync.handlers.cs_mft_dcs_request_handler import NetworkRequestHandler +from cloudsync.handlers.cs_mft_dcs_request_handler import NetworkRequestHandler, IdempotentRequestHandler from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.busses.file_output_bus import FileOutputBus from cloudsync.utils.reachability import ReachabilityProvider from cloudsync.utils.globals import * from cloudsync.utils.helpers import * from cloudsync.utils.logging import LoggingConfig from cloudsync.utils.watchdog import Watchdog, make_restart_fn +from cloudsync.utils.alive import AliveProvider import hmac import os @@ -24,7 +25,7 @@ import threading -VERSION = "0.5.3" +VERSION = "0.5.5" arguments = sys.argv @@ -90,36 +91,73 @@ g_utils.logger.info(SETUP_CONSOLE_LINE) sys.exit(0) +# F7 - fleet-stagger jitter computed once at boot from the HD serial. +# Reused by ReachabilityProvider (first-probe delay) and FileOutputBus +# (F6 pre-warm window offset). A pre-registration device with no serial +# gets jitter=0, which is acceptable: registration happens once per +# device and is not a fleet-wide concern. +_device_serial = g_config[CONFIG_DEVICE].get(CONFIG_DEVICE_HD_SERIAL) +_device_jitter_s = get_device_jitter_seconds(_device_serial) +g_utils.logger.info( + "F7 fleet jitter: %ds (serial=%s)", _device_jitter_s, _device_serial) + try: - reachability_provider = ReachabilityProvider(logger=app.logger, url_reachability=g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_REACHABILITY_URL]) + reachability_provider = ReachabilityProvider( + logger=app.logger, + url_reachability=g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_REACHABILITY_URL], + initial_delay_s=_device_jitter_s) except Exception as e: g_utils.logger.error( "Reachability URL missing from config file. Using Default URL - {0}".format(DEFAULT_REACHABILITY_URL)) reachability_provider = ReachabilityProvider( - logger=app.logger, url_reachability=DEFAULT_REACHABILITY_URL) + logger=app.logger, url_reachability=DEFAULT_REACHABILITY_URL, + initial_delay_s=_device_jitter_s) try: g_utils.add_reachability_provider(reachability_provider=reachability_provider) - output_channel = FileOutputBus(logger=app.logger, max_size=100, file_channels_path=UI2CS_FILE_CHANNELS_PATH) + # F6/F7 - pre-warm thread runs inside FileOutputBus; jitter staggers + # the fleet across a 2-minute window starting at 23:55 UTC. + output_channel = FileOutputBus(logger=app.logger, max_size=100, + file_channels_path=UI2CS_FILE_CHANNELS_PATH, + jitter_seconds=_device_jitter_s) error_handler = ErrorHandler(logger=app.logger, max_size=100, output_channel=output_channel) + # F9 — split-lane architecture. + # State lane (max_size=1): strict-ordering handler for registration, + # SET_DEVICE_STATE and SEND_TREATMENT_REPORT. The single-slot queue is + # the explicit ordering invariant — do NOT change max_size on this one. network_request_handler = NetworkRequestHandler(logger=app.logger, max_size=1, output_channel=output_channel, reachability_provider=reachability_provider, error_handler=error_handler) + # Idempotent lane (max_size=16): absorbs long-running, filename-keyed + # uploads (SEND_DEVICE_LOG up to ~350 MB, SEND_CS_LOG up to ~300 MB). + # Single worker per lane; max_size is queue depth, not parallelism. + # Keeps the state lane responsive during 10–30 minute transfers. + idempotent_network_request_handler = IdempotentRequestHandler( + logger=app.logger, max_size=16, output_channel=output_channel, + reachability_provider=reachability_provider, error_handler=error_handler) message_handler = UICSMessageHandler(logger=app.logger, max_size=20, network_request_handler=network_request_handler, output_channel=output_channel, reachability_provider=reachability_provider, - error_handler=error_handler) + error_handler=error_handler, + idempotent_network_request_handler=idempotent_network_request_handler) ui_cs_bus = FileInputBus(logger=app.logger, file_channels_path=UI2CS_FILE_CHANNELS_PATH, input_channel_name="inp.buf", g_config=g_config, message_handler=message_handler) + # F5 — heartbeat gets the idempotent lane + g_config so it can drain + # rotated CS-log files one-per-tick when that lane is idle. Registration + # mode sets send_heartbeat=False below, so the F5 branch stays inert + # during registration. heartbeat_provider = HeartBeatProvider(logger=app.logger, network_request_handler=network_request_handler, - output_channel=output_channel) + output_channel=output_channel, + idempotent_network_request_handler=idempotent_network_request_handler, + g_config=g_config) - logconf.set_network_provider(network_request_handler=network_request_handler) + # F9 — CS-log uploads (SEND_CS_LOG) go through the idempotent lane. + logconf.set_network_provider(network_request_handler=idempotent_network_request_handler) logconf.set_error_provider(error_handler=error_handler) logconf.set_configuration(g_config=g_config) logconf.set_log_level(g_config[CONFIG_LOGS][CONFIG_LOGS_DEFAULT_LOG_LEVEL]) @@ -139,20 +177,56 @@ # the registration-to-operation transition. watchdog = Watchdog(logger=app.logger) if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': + # F3: scheduler-independent CS-alive file. Auto-starts on construction. + # The `last_progress_ts` attribute is consumed by F1 (below). + alive_provider = AliveProvider(logger=app.logger) + # F1 — progress-signal contracts (plan §4 F1). max_idle_s values + # are conservative; require 3 consecutive stale reads (default + # max_restarts) before escalating to sentinel, which gives each + # thread an effective 3 × check_interval grace on top of max_idle_s. + watchdog.register("alive", lambda: alive_provider.thread, + make_restart_fn(alive_provider, "thread", alive_provider._loop), + progress_fn=lambda: alive_provider.last_progress_ts, + max_idle_s=30) watchdog.register("reachability", lambda: reachability_provider.thread, make_restart_fn(reachability_provider, "thread", reachability_provider.reachability_test)) watchdog.register("output_bus", lambda: output_channel.thread, - make_restart_fn(output_channel, "thread", output_channel.scheduler)) + make_restart_fn(output_channel, "thread", output_channel.scheduler), + progress_fn=lambda: output_channel.last_progress_ts, + max_idle_s=120) watchdog.register("error_handler", lambda: error_handler.thread, make_restart_fn(error_handler, "thread", error_handler.scheduler)) watchdog.register("network_request_handler", lambda: network_request_handler.thread, - make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler)) + make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler), + progress_fn=lambda: network_request_handler.last_progress_ts, + max_idle_s=300) + # F9/F1 — idempotent lane worker: large-file uploads legitimately + # take 10–30 min, so max_idle_s is 1800 s (30 min) per plan §4 F1. + watchdog.register("idempotent_request_handler", + lambda: idempotent_network_request_handler.thread, + make_restart_fn(idempotent_network_request_handler, "thread", + idempotent_network_request_handler.scheduler), + progress_fn=lambda: idempotent_network_request_handler.last_progress_ts, + max_idle_s=1800) watchdog.register("message_handler", lambda: message_handler.thread, - make_restart_fn(message_handler, "thread", message_handler.scheduler)) + make_restart_fn(message_handler, "thread", message_handler.scheduler), + progress_fn=lambda: message_handler.last_progress_ts, + max_idle_s=120) watchdog.register("file_input_bus", lambda: ui_cs_bus.thread, - make_restart_fn(ui_cs_bus, "thread", ui_cs_bus.input_channel_handler)) + make_restart_fn(ui_cs_bus, "thread", ui_cs_bus.input_channel_handler), + progress_fn=lambda: ui_cs_bus.last_progress_ts, + max_idle_s=600) watchdog.register("heartbeat", lambda: heartbeat_provider.thread, - make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat)) + make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat), + progress_fn=lambda: heartbeat_provider.last_progress_ts, + max_idle_s=90) + # F1 — chronic queue-saturation escalation. If the F4 counter shows + # more than 3 "queue full" events in the last 5 min, individual + # threads are alive but the pipeline is starved — skip restarts and + # go straight to sentinel for a clean-slate process restart. + watchdog.register_sentinel_condition( + "queue_exhaustion", + lambda: helpers_queue_full_event_count() > 3) watchdog.start() g_utils.logger.info("Watchdog started (operation mode)") else: @@ -231,14 +305,24 @@ return {"invalidAttributes": invalid_params}, BAD_REQUEST try: - helpers_add_to_network_queue(network_request_handler=network_request_handler, - request_type=NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS, - url=request.url, - payload=payload, - method=request.method, - g_config=g_config, - success_message='CS2MFT_REQ_SET_CREDENTIALS request added to network ' - 'queue') + # F4 caller audit: manufacturing-tool credentials flow. Surface + # queue failures to the existing CS2UI_ERROR channel so the MFT + # can observe and retry. + outcome, detail = helpers_add_to_network_queue( + network_request_handler=network_request_handler, + request_type=NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS, + url=request.url, + payload=payload, + method=request.method, + g_config=g_config, + success_message='CS2MFT_REQ_SET_CREDENTIALS request added to network ' + 'queue') + if outcome != "queued": + error = Error("{0},2,{1},Queue {2}: {3}".format( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, + outcome, detail)) + error_handler.enqueue_error(error=error) except Exception as e: error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value,