"""Handler of commands sent by CloudSync app to the Diality Cloud System""" from typing import List, Tuple from time import monotonic import json import requests import urllib.parse from cloudsync.utils.globals import * from cloudsync.common.enums import * from cloudsync.utils.helpers import * from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error def _upload_rejection(filename: str, reason_code: int) -> dict: """Build a terminal-failure rejection dict for an upload call. Returned instead of ``None`` whenever the upload cannot be completed. The caller (cs_mft_dcs_request_handler upload dispatch) translates this into a protocol-shaped ``2010,3,,0,`` message on the UI output bus so UI-Brain's existing rejection handlers (code 1 Disconnected → backup & retry, 2 Credential → backup & no upload, 3 Duplicate → backup & rename) engage automatically. """ return {"accepted": False, "filename": filename, "reason_code": reason_code} def format_upload_error_body(filename: str, body: str) -> str: """Prefix the device-log filename onto an Error 930 message body so a single ``grep`` on a CS log can correlate every upload-pipeline error with its file. Empty/missing filename collapses to ```` so the prefix shape is stable across all callers. Every upload-pipeline emit site (CS_DEVICE_LOG_ERROR / CS_LOG_ERROR) MUST go through this helper; an AST-walk audit test in the test suite enforces this contract. """ fn = filename if filename else "unknown" return f"<{fn}> {body}" def _classify_http_reason(status_code: int) -> int: """Map an HTTP status to a LogUploadReasonCode. 401 / 403 → CREDENTIAL (reason 2). Genuine auth failures require operator attention; UI-Brain backs up without retry. Everything else (timeouts, 5xx, connection resets, DNS, TLS) → DISCONNECTED (reason 1). Safest default; UI-Brain backs up and retries later. """ if status_code in (401, 403): return LogUploadReasonCode.CREDENTIAL.value return LogUploadReasonCode.DISCONNECTED.value def _should_retry_with_refresh(resp) -> bool: """Retry predicate for reactive cert-auth refresh. Returns True iff the response is an HTTP 401 or 403 — the only status codes that indicate a stale/invalid token. 409 (duplicate), 5xx (server side), and network errors (resp is None) all propagate unchanged; no token refresh helps those. """ return resp is not None and resp.status_code in (401, 403) def _inject_tracing_headers(headers: dict, correlation_id: str = "", device_sn: str = "") -> dict: """Add cross-system tracing headers. Non-functional: unknown headers are ignored by DCS.""" if correlation_id: headers["X-Correlation-ID"] = correlation_id if device_sn: headers["X-Device-Serial"] = device_sn return headers def _build_dcs_headers(access_token: str, correlation_id: str = "", device_sn: str = "") -> dict: """Standard header dict for any DCS request carrying a Bearer token. Returns a fresh dict (safe to mutate per-call) with tracing headers already injected. Used by every ``cmd_outgoing_*`` that hits DCS. """ headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION, } _inject_tracing_headers(headers, correlation_id, device_sn) return headers def _call_with_retry(build_request: callable, access_token: str, token_refresher: callable, error_handler: ErrorHandler, error_id: int): """Single-retry-on-401/403 wrapper shared by every DCS call. ``build_request(access_token)`` is a caller-supplied closure that performs the HTTP call and returns a ``requests.Response``. The wrapper invokes it up to twice: on the first 401/403 it calls ``token_refresher()`` and retries with the refreshed token. 409, 5xx, and network errors propagate unchanged. Timeouts, redirect loops, and unexpected exceptions are translated into the caller-provided ``error_id`` and enqueue a standard Error before returning None. """ resp = None for attempt in (1, 2): try: resp = build_request(access_token) except requests.exceptions.Timeout: error_handler.enqueue_error( error=Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, error_id, "DCS request timeout")) return None except requests.exceptions.TooManyRedirects: error_handler.enqueue_error( error=Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, error_id)) return None except Exception as e: error_handler.enqueue_error( error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, error_id, str(e))) return None if _should_retry_with_refresh(resp) and attempt == 1 and token_refresher is not None: refreshed = token_refresher() if refreshed is None: break access_token = refreshed continue break return resp def _log_dcs_trace_id(response, correlation_id: str = ""): """Log DCS Jaeger traceId from response headers on non-2xx responses.""" if response is None: return if response.ok: return trace_id = response.headers.get("uber-trace-id", "") or response.headers.get("X-Correlation-ID", "") if trace_id: g_utils.logger.warning("[%s] DCS error %s — traceId=%s", correlation_id, response.status_code, trace_id) @log_func def cmd_outgoing_get_new_token_with_cert(path_certificate: str, path_private_key: str, save: bool, url: str, client_secret: str, error_handler: ErrorHandler, correlation_id: str = "", device_sn: str = "") -> str: """ Obtains authentication token with device certificate & private key :param path_certificate: The path to the certificate :param path_private_key: The path to the private key :param save: If True, save the token :param url: Identity Provider URL used to request a new token :param client_secret: Identity Provider client secret for CS app :param error_handler: global error handler :return: The new token """ try: payload = { "grant_type": "password", "scope": "openid profile", "client_id": "device-client", "client_secret": client_secret, } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } _inject_tracing_headers(headers, correlation_id, device_sn) cert_paths = (path_certificate, path_private_key) g_utils.logger.debug("Making request: {0}, {1}, {2}, {3}".format(url, headers, payload, cert_paths)) # Split connect/read timeout. The 5s default was too tight for # the read phase under field conditions; keep connect tight so # we fail fast on an unreachable identity server. response = requests.post(url=url, headers=headers, data=payload, cert=cert_paths, timeout=(5, 30)) # Surface Keycloak failures. A non-2xx response (or a 2xx response # that lacks "access_token") would otherwise return None without # enqueuing an error, producing thousands of downstream "Missing # access token" errors with no diagnostic trail. if response.status_code != 200: body_preview = (response.text or "")[:200] error = Error.general( OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, "Keycloak non-2xx; status={0}, body={1!r}".format( response.status_code, body_preview)) error_handler.enqueue_error(error=error) return None try: data = response.json() except ValueError: body_preview = (response.text or "")[:200] error = Error.general( OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, "Keycloak 200 but non-JSON response; body={0!r}".format( body_preview)) error_handler.enqueue_error(error=error) return None g_utils.logger.debug("Keycloak response: {0}".format(data)) token = data.get("access_token", None) if token is None: body_preview = (response.text or "")[:200] error = Error.general( OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, "Keycloak 200 but no access_token key; body={0!r}".format( body_preview)) error_handler.enqueue_error(error=error) return None if save: if not os.path.exists(TOKEN_CACHING_PATH): os.makedirs(TOKEN_CACHING_PATH) with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'w') as f: f.write(json.dumps(data, indent=4)) return token except requests.exceptions.Timeout: error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, "Obtain token request timeout") error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, str(e)) error_handler.enqueue_error(error=error) return None @log_func def cmd_outgoing_validate_device(access_token: str, hd_serial_number: str, dg_serial_number: str, sw_version: str, url: str, error_handler: ErrorHandler, token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> dict: """ Step 8. Validate device Step 9. Validate device response (list of invalid fields) :return: The json response """ payload = json.dumps({ "hdSerialNumber": hd_serial_number, "dgSerialNumber": dg_serial_number, "softwareVersion": sw_version }) def _do_call(token): return requests.post(url=url, headers=_build_dcs_headers(token, correlation_id, device_sn), data=payload, timeout=(30, 60)) response = _call_with_retry(_do_call, access_token, token_refresher, error_handler, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) if response is None: return None try: return response.json() except json.decoder.JSONDecodeError as e: error_handler.enqueue_error( error=Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, response.status_code, response.reason, str(e))) return None except Exception as e: error_handler.enqueue_error( error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, str(e))) return None # Runtime commands @log_func def cmd_outgoing_set_device_state(url: str, access_token: str, device_state_json: dict, error_handler: ErrorHandler, token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ Updates the backend with the current device state :param url: set device state URL :param device_state_json: device state payload :param access_token: access token :param error_handler: global error handler :param token_refresher: Optional callable returning a fresh access token (or None on failure) when a 401/403 is received. Set to None by legacy callers; bounded to a single retry per call. :return: The response """ def _do_call(token): return requests.put(url=url, headers=_build_dcs_headers(token, correlation_id, device_sn), data=device_state_json, timeout=(30, 60)) return _call_with_retry(_do_call, access_token, token_refresher, error_handler, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) @log_func def cmd_outgoing_check_if_patient_with_emr_id_exists(access_token: str, url: str, error_handler: ErrorHandler, token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: def _do_call(token): return requests.get(url=url, headers=_build_dcs_headers(token, correlation_id, device_sn), timeout=(30, 60)) return _call_with_retry(_do_call, access_token, token_refresher, error_handler, ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) @log_func def cmd_outgoing_create_temporary_patient(access_token: str, url: str, error_handler: ErrorHandler, token_refresher: callable = None, correlation_id: str = "", device_sn: str = ""): def _do_call(token): return requests.post(url=url, headers=_build_dcs_headers(token, correlation_id, device_sn), data={}, timeout=(30, 60)) response = _call_with_retry(_do_call, access_token, token_refresher, error_handler, ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) if response is None: return None try: return response.json() except json.decoder.JSONDecodeError as e: error_handler.enqueue_error( error=Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, response.status_code, response.reason, str(e))) return None except Exception as e: error_handler.enqueue_error( error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, str(e))) return None @log_func def cmd_outgoing_set_patient_device_association(url: str, access_token: str, associate: bool, error_handler: ErrorHandler, token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ Sets the status of the device & patient association :param url: set device state URL :param access_token: access token :param associate: status of the device & patient association :param error_handler: global error handler :param token_refresher: Optional callable returning a fresh access token (or None) on 401/403; wraps the whole HEAD+PUT or DELETE branch so a single refresh re-runs the full sub-request chain. :return: The response """ def _do_call(token): headers = _build_dcs_headers(token, correlation_id, device_sn) if associate: r = requests.head(url=urllib.parse.urljoin(url, "/exists"), headers=headers, data={}, timeout=(30, 60)) if r.status_code == NOT_FOUND: r = requests.put(url=url, headers=headers, data={}, timeout=(30, 60)) return r return requests.delete(url=url, headers=headers, data={}, timeout=(30, 60)) return _call_with_retry(_do_call, access_token, token_refresher, error_handler, ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) @log_func def cmd_outgoing_send_treatment_report(url: str, access_token: str, treatment_log: str, error_handler: ErrorHandler, token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ Sends a treatment report to DCS :param url: set device state URL :param access_token: access token :param treatment_log: treatment report sent to DCS :param error_handler: global error handler :param token_refresher: Optional callable returning a fresh access token (or None) on 401/403; bounded to a single retry. :return: The response """ def _do_call(token): return requests.post(url=url, headers=_build_dcs_headers(token, correlation_id, device_sn), data=treatment_log, timeout=(30, 60)) return _call_with_retry(_do_call, access_token, token_refresher, error_handler, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) @log_func def cmd_outgoing_check_data_log_exists(base_url: str, access_token: str, serial_number: str, device_file_name: str, error_handler: ErrorHandler, correlation_id: str = "", device_sn: str = "") -> bool: """ Checks if a device log file already exists in DCS before uploading. Returns True if exists (duplicate), False if not or on error (fail-open). """ url = base_url.rstrip("/") + "/api/device/data/log/exists" params = {"fileName": device_file_name, "serialNumber": serial_number} headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } _inject_tracing_headers(headers, correlation_id, device_sn) try: response = requests.head(url=url, headers=headers, params=params, timeout=(30, 60)) if response.status_code == 204: g_utils.logger.info(f"File {device_file_name} already exists in DCS (duplicate).") return True elif response.status_code == 404: return False else: g_utils.logger.warning(f"Unexpected status from log-exists check: {response.status_code}") return False except Exception as e: g_utils.logger.warning(f"Data log exists check failed, proceeding with upload: {e}") return False @log_func def cmd_outgoing_upload_file_in_chunks( base_url: str, access_token: str, file_json: dict, error_handler: ErrorHandler, log_file_origin: str, chunk_size: int=2 * 1024 * 1024, retries: int=3, token_refresher: callable=None, correlation_id: str = "", device_sn: str = "" ) -> Union[str, None]: """ Uploads a large file in chunks using sessions and retries. Args: base_url (str): The base URL of the API. access_token (str): The access token used for Authorization. file_json (dict): The populated/constructed `log_upload_template.json` file for a specific file. error_handler (ErrorHandler): Current `ErrorHandler` instance. log_file_origin (str): The origin of the log file to be uploaded. Accepted values `device` or `cs` chunk_size (int, optional): The size of each chunk in bytes. Defaults to 50MB. retries (int, optional): The number of times to retry failed uploads. Defaults to 3. Returns: str | None: The uploaded file name if succeeded, None otherwise. """ origins = ("cs", "device") if log_file_origin not in origins: g_utils.logger.error(f"Wrong log file origin provided.") # Programmer-error path; no filename available — safe to return None. return None ERROR_ID = ErrorIDs.CS_DEVICE_LOG_ERROR.value if log_file_origin == 'device' else ErrorIDs.CS_LOG_ERROR.value # Extract filename early so every rejection path can surface it to # UI-Brain via the protocol-shaped 2010 message. try: _filename = str(file_json['start_session']['metadata']['deviceFileName']) except (KeyError, TypeError): _filename = "" # Single structured upload-outcome line at every function exit so an # operator can grep one CS-log line per upload (success or failure) # instead of correlating across many INFO/ERROR lines. State dict is # populated incrementally as the upload progresses; the closures # below capture it by reference and emit on every return path. _summary = { "file_size": None, "num_chunks": 0, "chunks_succeeded": 0, "start_time": monotonic(), "session_id": None, "sha256": "", } def _emit_failed(reason: str) -> None: duration_s = int(monotonic() - _summary["start_time"]) g_utils.logger.error( f"upload_failed file={_filename} size={_summary['file_size']} " f"chunks={_summary['chunks_succeeded']}/{_summary['num_chunks']} " f"duration_s={duration_s} session={_summary['session_id'] or 'none'} " f"reason='{reason}'" ) def _emit_complete() -> None: duration_s = int(monotonic() - _summary["start_time"]) g_utils.logger.info( f"upload_complete file={_filename} size={_summary['file_size']} " f"chunks={_summary['num_chunks']}/{_summary['num_chunks']} " f"duration_s={duration_s} session={_summary['session_id']} " f"sha256={_summary['sha256']}" ) # # Start upload session # # Reactive retry on 401/403 (server authority): DCS's JwtBearer # middleware decides when a refresh is needed; the client only # refreshes in response to an explicit auth-class status code. start_session_url = base_url.rstrip("/") + "/api/device/data/start-session" start_session_payload = json.dumps(file_json['start_session']) g_utils.logger.info(f"Starting upload session for {log_file_origin} log") g_utils.logger.debug(f"File upload payload (start-session): {start_session_payload}") # Inline retry (not _call_with_retry): the chunk loop and end-session # below both use the post-retry access_token, so the wrapper needs to # rebind the local. Helper returns only the response. response = None for attempt in (1, 2): try: response = requests.post( url=start_session_url, headers=_build_dcs_headers(access_token, correlation_id, device_sn), data=start_session_payload, timeout=(30, 60)) g_utils.logger.info(f"Start-session response: {response.status_code}") except Exception as e: error_handler.enqueue_error( error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, format_upload_error_body(_filename, f"Start-session failed: {e}"))) _emit_failed(f"start-session connection error: {e}") return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) if _should_retry_with_refresh(response) and attempt == 1 and token_refresher is not None: refreshed = token_refresher() if refreshed is None: break access_token = refreshed continue break if response.status_code == CONFLICT: g_utils.logger.info(f"File {_filename} rejected as duplicate at start-session (409 Conflict).") _emit_failed("start-session 409 duplicate") return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) if response.status_code != 200: _log_dcs_trace_id(response, correlation_id) g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") # Map auth-class status codes to CREDENTIAL; everything # else terminal → DISCONNECTED. reason = _classify_http_reason(response.status_code) error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, format_upload_error_body( _filename, f"Start-session failed: {response.status_code} - {response.text[:500]}")) error_handler.enqueue_error(error=error) _emit_failed(f"start-session {response.status_code}") return _upload_rejection(_filename, reason) session_id = response.json().get("sessionId") if not session_id: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, format_upload_error_body(_filename, "Start-session response missing sessionId")) error_handler.enqueue_error(error=error) _emit_failed("missing sessionId") return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) _summary["session_id"] = session_id # # Send file in chunks # try: target_file = file_json['general']["file_path"] file_size = file_json['general']["file_size"] upload_chunk_url = base_url.rstrip("/") + "/api/device/data/chunk" upload_chunk_payload_template = file_json['upload_chunk'] upload_chunk_payload_template['sessionId'] = session_id upload_chunk_payload_template['chunkType'] = "device-data" # chunkNo is derived from a 1-indexed loop counter (chunk_idx), # NOT from a state variable that tracks success. Defensive # against any future refactor that routes "advance to next # chunk after failure" through this code, which would silently # send the same chunkNo with different bytes. chunk_idx = 0 headers = _build_dcs_headers(access_token, correlation_id, device_sn) # Stream the file in raw chunks aligned to a 3-byte boundary and # base64-encode each chunk independently. Peak RAM per upload: # one chunk (~2 MB) instead of 7/3 × file_size (a non-streaming # path keeps both the full file bytes and the full base64 string # resident, which on ~300 MB CS logs pushed 2 GB devices into # OOM-kill). Raw-chunk size MUST be a multiple of 3 so each # chunk's base64 output concatenates to the same string as # base64(whole file) — preserving server-side reassembly # semantics unchanged. raw_chunk_size = (chunk_size // 4) * 3 if raw_chunk_size < 3: raw_chunk_size = 3 # degenerate safeguard # total_base64_size and num_chunks are derived arithmetically # from file_size so we can report progress ("chunk K of N") the # without requiring a pre-scan. if file_size is None: file_size = os.path.getsize(target_file) total_base64_size = 4 * ((file_size + 2) // 3) num_chunks = total_base64_size // chunk_size + (total_base64_size % chunk_size > 0) if num_chunks == 0: num_chunks = 1 # empty/tiny file still goes through once _summary["file_size"] = file_size _summary["num_chunks"] = num_chunks all_chunks_ok = True with open(target_file, "rb") as f: while True: raw_block = f.read(raw_chunk_size) if not raw_block: break chunk_idx += 1 chunk = base64.b64encode(raw_block).decode("utf8") # Retry logic with counter and backoff time. # auth_refresh_attempted is per-chunk: token refresh fires at # most once per chunk across all retries. A still-stale token # after refresh falls through to regular retry/abort instead # of looping refresh forever. chunk_uploaded = False retry_count = 0 auth_refresh_attempted = False while retry_count < retries: try: # Fresh dict per attempt so we don't carry the # prior chunk's state forward across retries. upload_chunk_payload = dict(upload_chunk_payload_template) upload_chunk_payload['chunkNo'] = chunk_idx upload_chunk_payload['data'] = chunk upload_chunk_body = json.dumps(upload_chunk_payload) g_utils.logger.debug( f"File upload payload (upload-chunk) - chunk No {chunk_idx}: " f"bytes={len(upload_chunk_body)}") response = requests.post(upload_chunk_url, headers=headers, data=upload_chunk_body, timeout=(30, 60)) if response.status_code == 200: g_utils.logger.info(f"Uploaded chunk {chunk_idx} of {num_chunks}") chunk_uploaded = True _summary["chunks_succeeded"] += 1 break # Successful upload, break retry loop # Reactive auth-class refresh. On 401/403 ask # token_refresher for a fresh token, rebind the # Authorization header, and retry this chunk # WITHOUT consuming a backoff slot. Token validity # is server-authoritative; never decode # access_token.exp or compare TTL to a wall-clock # value -- device test protocols change the system # clock to simulate midnight cycles, which corrupts # any wall-clock-based expiry check. if (_should_retry_with_refresh(response) and not auth_refresh_attempted and token_refresher is not None): auth_refresh_attempted = True refreshed = token_refresher() if refreshed is not None: access_token = refreshed headers = _build_dcs_headers( access_token, correlation_id, device_sn) g_utils.logger.info( f"Chunk {chunk_idx}/{num_chunks} got " f"{response.status_code}; refreshed token " f"and retrying immediately") continue # auth correction, not a backoff retry g_utils.logger.warning( f"Chunk {chunk_idx}/{num_chunks} got " f"{response.status_code} but token refresh " f"failed; falling through to retry/abort") error_handler.enqueue_error( error=Error.general( OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, format_upload_error_body( _filename, f"Token refresh failed during chunk " f"{chunk_idx}/{num_chunks} upload"))) g_utils.logger.warning( f"Chunk {chunk_idx}/{num_chunks} upload failed: " f"{response.status_code} - {response.text[:500]}") retry_count += 1 if retry_count < retries: g_utils.logger.info( f"Retrying chunk upload in 5 seconds " f"(attempt {retry_count}/{retries})...") sleep(5) except Exception as e: retry_count += 1 g_utils.logger.error( f"Chunk {chunk_idx}/{num_chunks} exception " f"(attempt {retry_count}/{retries}): {e}") if retry_count < retries: sleep(5) if not chunk_uploaded: g_utils.logger.error( f"Chunk {chunk_idx} failed after {retries} retries, aborting upload") all_chunks_ok = False break if not all_chunks_ok: chunks_done = chunk_idx - 1 if chunk_idx > 0 else 0 error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, format_upload_error_body( _filename, f"Chunked upload incomplete: {chunks_done}/{num_chunks} " f"chunks succeeded; failed at chunk {chunk_idx} " f"after {retries} retries")) error_handler.enqueue_error(error=error) _emit_failed(f"chunk {chunk_idx} retries exhausted") # Chunk retries exhausted → Disconnected return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, format_upload_error_body( _filename, f"Unhandled exception in chunk loop: {e}")) error_handler.enqueue_error(error=error) _emit_failed(f"chunk-loop exception: {e}") # Unhandled exception in chunk loop → Disconnected (safest default) return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) # # End upload session # # Reactive retry on 401/403 (server authority): refresh only when # the server explicitly rejects, not on every end-session call. end_session_url = base_url.rstrip("/") + "/api/device/data/end-session" end_session_payload = file_json['end_session'] end_session_payload['sessionId'] = session_id end_session_payload['completedAt'] = int(datetime.now(timezone.utc).timestamp()*1000) end_session_payload = json.dumps(end_session_payload) g_utils.logger.info(f"Ending upload session (sessionId={session_id})") g_utils.logger.debug(f"Device log upload payload (end-session): {end_session_payload}") response = None for attempt in (1, 2): try: response = requests.post( end_session_url, headers=_build_dcs_headers(access_token, correlation_id, device_sn), data=end_session_payload, timeout=(30, 60)) g_utils.logger.info(f"End-session response: {response.status_code}") except Exception as e: error_handler.enqueue_error( error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, format_upload_error_body(_filename, f"End-session failed: {e}"))) _emit_failed(f"end-session connection error: {e}") return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) if _should_retry_with_refresh(response) and attempt == 1 and token_refresher is not None: refreshed = token_refresher() if refreshed is None: break access_token = refreshed continue break if response.status_code == CONFLICT: g_utils.logger.info(f"File {_filename} rejected as duplicate (409 Conflict).") _emit_failed("end-session 409 duplicate") return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) if response.status_code != 200: _log_dcs_trace_id(response, correlation_id) g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") reason = _classify_http_reason(response.status_code) # Include response body (truncated) so the operator can tell a # 500-from-DCS-bug apart from a 500-from-bad-payload from a # single Error 930 line. error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, format_upload_error_body( _filename, f"End-session failed: {response.status_code} - {response.text[:500]}")) error_handler.enqueue_error(error=error) _emit_failed(f"end-session {response.status_code}") return _upload_rejection(_filename, reason) g_utils.logger.info(f"File {file_json['start_session']['metadata']['deviceFileName']} uploaded.") try: _summary["sha256"] = str(file_json['end_session'].get('checksum', ''))[:16] except (AttributeError, TypeError): _summary["sha256"] = "" _emit_complete() return str(file_json['start_session']['metadata']['deviceFileName'])