Index: cloudsync/handlers/auth_failure_tracker.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -rf26f09af1e10683fcbeb7fac051976ee0cd504f0 --- cloudsync/handlers/auth_failure_tracker.py (.../auth_failure_tracker.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/handlers/auth_failure_tracker.py (.../auth_failure_tracker.py) (revision f26f09af1e10683fcbeb7fac051976ee0cd504f0) @@ -10,11 +10,12 @@ after boot, which is the desired operator signal (the issue survives reboot). -Wiring: ``NetworkRequestHandler.get_valid_token`` calls +Wiring (F15 — ST-15): ``NetworkRequestHandler._refresh_via_cert`` calls :meth:`PersistentAuthFailureTracker.record_failure` after every ``cmd_outgoing_get_new_token_with_cert`` call that returns ``None``, then checks :meth:`PersistentAuthFailureTracker.should_escalate` to decide -whether to enqueue the 934 error. +whether to enqueue the 934 error. (Pre-F15 the hook lived in +``get_valid_token``; the tracker contract is unchanged.) """ from collections import deque from threading import Lock Index: cloudsync/handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -rf26f09af1e10683fcbeb7fac051976ee0cd504f0 --- cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision f26f09af1e10683fcbeb7fac051976ee0cd504f0) @@ -40,9 +40,9 @@ # Default 0 signals "no work yet"; the watchdog tolerates this until # the first tick. self.last_progress_ts = 0 - # F14c — persistent auth-failure tracker, used by get_valid_token to - # aggregate repeated Keycloak fetch failures into a single - # operator-visible CS_AUTH_PERSISTENT_FAIL escalation. + # F14c — persistent auth-failure tracker, invoked from + # _refresh_via_cert to aggregate repeated Keycloak fetch failures + # into a single operator-visible CS_AUTH_PERSISTENT_FAIL escalation. from cloudsync.handlers.auth_failure_tracker import PersistentAuthFailureTracker self.auth_failure_tracker = PersistentAuthFailureTracker() self.thread = Thread(target=self.scheduler, daemon=True) @@ -172,13 +172,10 @@ base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] - token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) device_state_url = urllib.parse.urljoin(base_url, "/api/device") - access_token = self.get_valid_token(identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn) + access_token = self._acquire_token(identity_url, client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: @@ -191,10 +188,13 @@ device_state_json = json.dumps(device_state_json) + refresher = lambda: self._refresh_via_cert( + identity_url, client_secret, _cid, _dsn) response = cmd_outgoing_set_device_state(url=device_state_url, access_token=access_token, device_state_json=device_state_json, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -245,14 +245,11 @@ base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] - token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin(base_url, "api/device/validate") data_submission_url = urllib.parse.urljoin(base_url, "/api/device/data") - access_token = self.get_valid_token(identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn) + access_token = self._acquire_token(identity_url, client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is None: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, @@ -261,6 +258,9 @@ self.error_handler.enqueue_error(error=error) return + refresher = lambda: self._refresh_via_cert( + identity_url, client_secret, _cid, _dsn) + # Step #1 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, @@ -271,6 +271,7 @@ sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -305,6 +306,7 @@ response = cmd_outgoing_check_if_patient_with_emr_id_exists(access_token=access_token, url=patient_with_emr_id_exists_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) # Step #2b - If patient with emr_id doesn't exist, create temporary patient @@ -318,6 +320,7 @@ response = cmd_outgoing_create_temporary_patient(access_token=access_token, url=create_temporary_patient_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: return @@ -335,6 +338,7 @@ access_token=access_token, associate=True, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) # Step #4 - Send treatment report @@ -363,6 +367,7 @@ access_token=access_token, treatment_log=treatment_log_json, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -391,6 +396,7 @@ access_token=access_token, associate=False, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) except Exception as e: @@ -404,20 +410,19 @@ base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] - token_verification_url = urllib.parse.urljoin( - base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin( base_url, "api/device/validate") # Step #1 - get access token - access_token = self.get_valid_token(identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn) + access_token = self._acquire_token(identity_url, client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: + refresher = lambda: self._refresh_via_cert( + identity_url, client_secret, _cid, _dsn) + # Step #2 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, @@ -428,6 +433,7 @@ sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -466,11 +472,7 @@ file_json=device_log_json, error_handler=self.error_handler, log_file_origin='device', - token_refresher=lambda: self.get_valid_token( - identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn), + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if isinstance(upload_result, dict) and not upload_result.get("accepted"): @@ -507,20 +509,19 @@ base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] - token_verification_url = urllib.parse.urljoin( - base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin( base_url, "api/device/validate") # Step #1 - get access token - access_token = self.get_valid_token(identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn) + access_token = self._acquire_token(identity_url, client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: + refresher = lambda: self._refresh_via_cert( + identity_url, client_secret, _cid, _dsn) + # Step #2 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, @@ -531,6 +532,7 @@ sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler, + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if response is None: @@ -564,11 +566,7 @@ file_json=cs_log_json, error_handler=self.error_handler, log_file_origin='cs', - token_refresher=lambda: self.get_valid_token( - identity_url=identity_url, - token_verification_url=token_verification_url, - client_secret=client_secret, - correlation_id=_cid, device_sn=_dsn), + token_refresher=refresher, correlation_id=_cid, device_sn=_dsn) if isinstance(cs_log_filename, dict) and not cs_log_filename.get("accepted"): @@ -600,49 +598,34 @@ else: g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) - def get_valid_token(self, identity_url, token_verification_url, client_secret, - correlation_id="", device_sn=""): - access_token = helpers_get_stored_token() + def _refresh_via_cert(self, identity_url, client_secret, + correlation_id="", device_sn=""): + """F15d — shared cert-auth refresh path with F14c tracker wiring. - fetched_fresh = False - if access_token is None: - self.logger.info("No stored token found, requesting new token") - access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, - path_private_key=CREDENTIALS_PRIVATE_KEY, - save=True, - url=identity_url, - client_secret=client_secret, - error_handler=self.error_handler, - correlation_id=correlation_id, - device_sn=device_sn) - fetched_fresh = True - else: - response = cmd_outgoing_verify_token(url=token_verification_url, - access_token=access_token, - error_handler=self.error_handler, - correlation_id=correlation_id, - device_sn=device_sn) - if response is None or response.status_code != OK: - self.logger.warning(f"Token verification failed (status={response.status_code if response else 'None'}), refreshing token") - access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, - path_private_key=CREDENTIALS_PRIVATE_KEY, - save=True, - url=identity_url, - client_secret=client_secret, - error_handler=self.error_handler, - correlation_id=correlation_id, - device_sn=device_sn) - fetched_fresh = True - else: - self.logger.info("Token verification succeeded") + Fetches a fresh access token from Keycloak via device cert auth. + On failure (None return), increments the persistent-auth-failure + tracker and emits ``CS_AUTH_PERSISTENT_FAIL`` (934) when the + threshold trips (20 failures in 5 min). - # F14c — if the fresh cert-auth fetch returned None, track the - # failure; emit an aggregate CS_AUTH_PERSISTENT_FAIL escalation - # when the threshold trips. Does NOT replace the per-failure 922 - # already enqueued by cmd_outgoing_get_new_token_with_cert; this - # is an ADDITIONAL operator-actionable signal when the pattern - # is persistent. - if fetched_fresh and access_token is None: + Used by ``_acquire_token`` (first-time token fetch) and by the + per-site retry loops in ``cmd_outgoing_*`` (reactive refresh on + 401/403). Hook point for F14c moves here from the legacy + ``get_valid_token`` when F15c lands. + + :return: Fresh access token on success, or None on cert-auth failure. + """ + access_token = cmd_outgoing_get_new_token_with_cert( + path_certificate=CREDENTIALS_CERTIFICATE_X509, + path_private_key=CREDENTIALS_PRIVATE_KEY, + save=True, + url=identity_url, + client_secret=client_secret, + error_handler=self.error_handler, + correlation_id=correlation_id, + device_sn=device_sn, + ) + + if access_token is None: self.auth_failure_tracker.record_failure() if self.auth_failure_tracker.should_escalate(): escalation = Error.general( @@ -661,6 +644,23 @@ return access_token + def _acquire_token(self, identity_url, client_secret, + correlation_id="", device_sn=""): + """F15c — get an access token for a DCS request. + + Returns the stored token if present (server decides validity via + JwtBearer). If the stored token is missing/corrupt, fetches a + fresh one via cert auth through ``_refresh_via_cert`` (F14c + tracked). May return None on persistent cert-auth failure; + callers must treat None as 'cannot proceed with this request'. + """ + tok = helpers_get_stored_token() + if tok is not None: + return tok + self.logger.info("No stored token found, requesting new token") + return self._refresh_via_cert(identity_url, client_secret, + correlation_id, device_sn) + def wait_for_network(self, wait_time): counter = 0 while not self.reachability_provider.reachability and counter < wait_time: Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -rf26f09af1e10683fcbeb7fac051976ee0cd504f0 --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision f26f09af1e10683fcbeb7fac051976ee0cd504f0) @@ -40,6 +40,17 @@ return LogUploadReasonCode.DISCONNECTED.value +def _should_retry_with_refresh(resp) -> bool: + """F15b — retry predicate for reactive cert-auth refresh. + + Returns True iff the response is an HTTP 401 or 403 — the only + status codes that indicate a stale/invalid token. 409 (duplicate), + 5xx (server side), and network errors (resp is None) all propagate + unchanged; no token refresh helps those. + """ + return resp is not None and resp.status_code in (401, 403) + + def _inject_tracing_headers(headers: dict, correlation_id: str = "", device_sn: str = "") -> dict: """Add cross-system tracing headers. Non-functional: unknown headers are ignored by DCS.""" if correlation_id: @@ -49,6 +60,69 @@ return headers +def _build_dcs_headers(access_token: str, + correlation_id: str = "", + device_sn: str = "") -> dict: + """Standard header dict for any DCS request carrying a Bearer token. + + Returns a fresh dict (safe to mutate per-call) with tracing headers + already injected. Used by every ``cmd_outgoing_*`` that hits DCS. + """ + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION, + } + _inject_tracing_headers(headers, correlation_id, device_sn) + return headers + + +def _call_with_retry(build_request: callable, + access_token: str, + token_refresher: callable, + error_handler: ErrorHandler, + error_id: int): + """F15b — single-retry-on-401/403 wrapper shared by every DCS call. + + ``build_request(access_token)`` is a caller-supplied closure that + performs the HTTP call and returns a ``requests.Response``. The + wrapper invokes it up to twice: on the first 401/403 it calls + ``token_refresher()`` and retries with the refreshed token. 409, + 5xx, and network errors propagate unchanged. Timeouts, redirect + loops, and unexpected exceptions are translated into the + caller-provided ``error_id`` and enqueue a standard Error before + returning None. + """ + resp = None + for attempt in (1, 2): + try: + resp = build_request(access_token) + except requests.exceptions.Timeout: + error_handler.enqueue_error( + error=Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + error_id, "DCS request timeout")) + return None + except requests.exceptions.TooManyRedirects: + error_handler.enqueue_error( + error=Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, error_id)) + return None + except Exception as e: + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, error_id, str(e))) + return None + + if _should_retry_with_refresh(resp) and attempt == 1 and token_refresher is not None: + refreshed = token_refresher() + if refreshed is None: + break + access_token = refreshed + continue + break + + return resp + + def _log_dcs_trace_id(response, correlation_id: str = ""): """Log DCS Jaeger traceId from response headers on non-2xx responses.""" if response is None: @@ -174,114 +248,50 @@ @log_func -def cmd_outgoing_verify_token(url: str, - access_token: str, - error_handler: ErrorHandler, - correlation_id: str = "", - device_sn: str = "") -> requests.Response: - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - "X-OrganizationId": '1', - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - - data = { - "hdSerialNumber": "token-validation", - "dgSerialNumber": "token-validation", - "softwareVersion": "token-validation" - } - - resp = requests.post(url=url, - data=json.dumps(data), - headers=headers, - timeout=(30, 60)) - g_utils.logger.info(f"Token verification response: {resp.status_code}") - return resp - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None - - -@log_func def cmd_outgoing_validate_device(access_token: str, hd_serial_number: str, dg_serial_number: str, sw_version: str, url: str, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> dict: """ Step 8. Validate device Step 9. Validate device response (list of invalid fields) :return: The json response """ - try: - payload = json.dumps({ - "hdSerialNumber": hd_serial_number, - "dgSerialNumber": dg_serial_number, - "softwareVersion": sw_version - }) + payload = json.dumps({ + "hdSerialNumber": hd_serial_number, + "dgSerialNumber": dg_serial_number, + "softwareVersion": sw_version + }) - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + def _do_call(token): + return requests.post(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + data=payload, + timeout=(30, 60)) - response = requests.post(url=url, - headers=headers, - data=payload, - timeout=(30, 60)) - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) - error_handler.enqueue_error(error=error) + response = _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) + if response is None: return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None try: return response.json() except json.decoder.JSONDecodeError as e: - error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - response.status_code, response.reason, str(e)) - error_handler.enqueue_error(error=error) + error_handler.enqueue_error( + error=Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + response.status_code, response.reason, str(e))) return None except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, str(e))) return None @@ -292,6 +302,7 @@ access_token: str, device_state_json: dict, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ @@ -300,142 +311,70 @@ :param device_state_json: device state payload :param access_token: access token :param error_handler: global error handler + :param token_refresher: F15b — callable returning a fresh access + token (or None on failure) when a 401/403 is received. Set to + None by legacy callers; bounded to a single retry per call. :return: The response """ - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - payload = device_state_json - resp = requests.put(url=url, - headers=headers, - data=payload, + def _do_call(token): + return requests.put(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + data=device_state_json, timeout=(30, 60)) - return resp - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None + return _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) + @log_func def cmd_outgoing_check_if_patient_with_emr_id_exists(access_token: str, url: str, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + def _do_call(token): + return requests.get(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + timeout=(30, 60)) - response = requests.get(url=url, - headers=headers, - timeout=(30, 60)) - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None + return _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) - try: - return response - except json.decoder.JSONDecodeError as e: - error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - response.status_code, response.reason, str(e)) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None - @log_func def cmd_outgoing_create_temporary_patient(access_token: str, url: str, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = ""): - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + def _do_call(token): + return requests.post(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + data={}, + timeout=(30, 60)) - payload = {} - - response = requests.post(url=url, - headers=headers, - data=payload, - timeout=(30, 60)) - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) - error_handler.enqueue_error(error=error) + response = _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) + if response is None: return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None try: return response.json() except json.decoder.JSONDecodeError as e: - error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - response.status_code, response.reason, str(e)) - error_handler.enqueue_error(error=error) + error_handler.enqueue_error( + error=Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, + response.status_code, response.reason, str(e))) return None except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, str(e))) return None @@ -444,6 +383,7 @@ access_token: str, associate: bool, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ @@ -452,58 +392,34 @@ :param access_token: access token :param associate: status of the device & patient association :param error_handler: global error handler + :param token_refresher: F15b — callable returning a fresh access + token (or None) on 401/403; wraps the whole HEAD+PUT or DELETE + branch so a single refresh re-runs the full sub-request chain. :return: The response """ - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - payload = {} - + def _do_call(token): + headers = _build_dcs_headers(token, correlation_id, device_sn) if associate: - resp = requests.head(url=urllib.parse.urljoin(url, "/exists"), - headers=headers, - data=payload, + r = requests.head(url=urllib.parse.urljoin(url, "/exists"), + headers=headers, data={}, timeout=(30, 60)) + if r.status_code == NOT_FOUND: + r = requests.put(url=url, headers=headers, data={}, timeout=(30, 60)) + return r + return requests.delete(url=url, headers=headers, data={}, + timeout=(30, 60)) - if resp.status_code == NOT_FOUND: - resp = requests.put(url=url, - headers=headers, - data=payload, - timeout=(30, 60)) - else: - resp = requests.delete(url=url, - headers=headers, - data=payload, - timeout=(30, 60)) - return resp - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None + return _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) @log_func def cmd_outgoing_send_treatment_report(url: str, access_token: str, treatment_log: str, error_handler: ErrorHandler, + token_refresher: callable = None, correlation_id: str = "", device_sn: str = "") -> requests.Response: """ @@ -512,41 +428,20 @@ :param access_token: access token :param treatment_log: treatment report sent to DCS :param error_handler: global error handler + :param token_refresher: F15b — callable returning a fresh access + token (or None) on 401/403; bounded to a single retry. :return: The response """ - try: - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - - payload = treatment_log - - resp = requests.post(url=url, - headers=headers, - data=payload, + def _do_call(token): + return requests.post(url=url, + headers=_build_dcs_headers(token, correlation_id, device_sn), + data=treatment_log, timeout=(30, 60)) - return resp - except requests.exceptions.Timeout: - error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except requests.exceptions.TooManyRedirects: - error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) - error_handler.enqueue_error(error=error) - return None - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, - str(e)) - error_handler.enqueue_error(error=error) - return None + return _call_with_retry(_do_call, access_token, token_refresher, + error_handler, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) + @log_func def cmd_outgoing_check_data_log_exists(base_url: str, access_token: str, @@ -627,57 +522,56 @@ # # Start upload session # + # F15b — reactive retry on 401/403 (server authority). Replaces the + # pre-F15 proactive `token_refresher()` call that refreshed every + # upload's start-session unconditionally; DCS's JwtBearer middleware + # now decides when a refresh is needed. - # Refresh token before start-session to prevent expiry at midnight rollover - if token_refresher is not None: - refreshed_token = token_refresher() - if refreshed_token is not None: - access_token = refreshed_token - start_session_url = base_url.rstrip("/") + "/api/device/data/start-session" - start_session_payload = file_json['start_session'] - start_session_payload = json.dumps(start_session_payload) - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + start_session_payload = json.dumps(file_json['start_session']) g_utils.logger.info(f"Starting upload session for {log_file_origin} log") g_utils.logger.debug(f"File upload payload (start-session): {start_session_payload}") - try: - response = requests.post( - url=start_session_url, - headers=headers, - data=start_session_payload, - timeout=(30, 60)) + # Inline retry (not _call_with_retry): the chunk loop and end-session + # below both use the post-retry access_token, so the wrapper needs to + # rebind the local. Helper returns only the response. + response = None + for attempt in (1, 2): + try: + response = requests.post( + url=start_session_url, + headers=_build_dcs_headers(access_token, correlation_id, device_sn), + data=start_session_payload, + timeout=(30, 60)) + g_utils.logger.info(f"Start-session response: {response.status_code}") + except Exception as e: + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e))) + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) - g_utils.logger.info(f"Start-session response: {response.status_code}") + if _should_retry_with_refresh(response) and attempt == 1 and token_refresher is not None: + refreshed = token_refresher() + if refreshed is None: + break + access_token = refreshed + continue + break - if response.status_code == CONFLICT: - g_utils.logger.info(f"File {_filename} rejected as duplicate at start-session (409 Conflict).") - return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) + if response.status_code == CONFLICT: + g_utils.logger.info(f"File {_filename} rejected as duplicate at start-session (409 Conflict).") + return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) - if response.status_code != 200: - _log_dcs_trace_id(response, correlation_id) - g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") - # F2 — map auth-class status codes to CREDENTIAL; everything - # else terminal → DISCONNECTED. - reason = _classify_http_reason(response.status_code) - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, - f"start-session {response.status_code}") - error_handler.enqueue_error(error=error) - return _upload_rejection(_filename, reason) - - except Exception as e: - # Network exceptions (timeout, DNS, TLS, connection reset) are all - # Disconnected — safest default per F2. - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + if response.status_code != 200: + _log_dcs_trace_id(response, correlation_id) + g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") + # F2 — map auth-class status codes to CREDENTIAL; everything + # else terminal → DISCONNECTED. + reason = _classify_http_reason(response.status_code) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + f"start-session {response.status_code}") error_handler.enqueue_error(error=error) - return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) + return _upload_rejection(_filename, reason) session_id = response.json().get("sessionId") if not session_id: @@ -697,13 +591,7 @@ upload_chunk_payload_template['sessionId'] = session_id upload_chunk_payload_template['chunkType'] = "device-data" chunk_number = 1 - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) + headers = _build_dcs_headers(access_token, correlation_id, device_sn) # F11 (0.5.5): stream the file in raw chunks aligned to a 3-byte # boundary and base64-encode each chunk independently. Peak RAM @@ -802,55 +690,53 @@ # # End upload session # + # F15b — reactive retry on 401/403 (server authority). Replaces the + # pre-F15 proactive `token_refresher()` call that refreshed after + # every chunked upload regardless of whether the token was actually + # stale. - # Refresh token before end-session to prevent expiry after long chunk uploads - if token_refresher is not None: - g_utils.logger.info("Refreshing token before end-session phase") - refreshed_token = token_refresher() - if refreshed_token is not None: - access_token = refreshed_token - g_utils.logger.info("Token refreshed successfully before end-session") - else: - g_utils.logger.warning("Token refresh returned None, using existing token for end-session") - end_session_url = base_url.rstrip("/") + "/api/device/data/end-session" end_session_payload = file_json['end_session'] end_session_payload['sessionId'] = session_id end_session_payload['completedAt'] = int(datetime.now(timezone.utc).timestamp()*1000) - headers = { - 'Authorization': BEARER_HOLDER.format(access_token), - 'Content-Type': CONTENT_TYPE, - 'User-Agent': USER_AGENT, - "X-Api-Version": API_VERSION - } - _inject_tracing_headers(headers, correlation_id, device_sn) - try: - end_session_payload = json.dumps(end_session_payload) - g_utils.logger.info(f"Ending upload session (sessionId={session_id})") - g_utils.logger.debug(f"Device log upload payload (end-session): {end_session_payload}") - response = requests.post(end_session_url, - headers=headers, - data=end_session_payload, - timeout=(30, 60)) + end_session_payload = json.dumps(end_session_payload) + g_utils.logger.info(f"Ending upload session (sessionId={session_id})") + g_utils.logger.debug(f"Device log upload payload (end-session): {end_session_payload}") - if response.status_code == CONFLICT: - g_utils.logger.info(f"File {_filename} rejected as duplicate (409 Conflict).") - return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) + response = None + for attempt in (1, 2): + try: + response = requests.post( + end_session_url, + headers=_build_dcs_headers(access_token, correlation_id, device_sn), + data=end_session_payload, + timeout=(30, 60)) + g_utils.logger.info(f"End-session response: {response.status_code}") + except Exception as e: + error_handler.enqueue_error( + error=Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e))) + return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) - g_utils.logger.info(f"End-session response: {response.status_code}") - if response.status_code != 200: - _log_dcs_trace_id(response, correlation_id) - g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") - reason = _classify_http_reason(response.status_code) - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, - f"end-session {response.status_code}") - error_handler.enqueue_error(error=error) - return _upload_rejection(_filename, reason) + if _should_retry_with_refresh(response) and attempt == 1 and token_refresher is not None: + refreshed = token_refresher() + if refreshed is None: + break + access_token = refreshed + continue + break - except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + if response.status_code == CONFLICT: + g_utils.logger.info(f"File {_filename} rejected as duplicate (409 Conflict).") + return _upload_rejection(_filename, LogUploadReasonCode.DUPLICATE.value) + + if response.status_code != 200: + _log_dcs_trace_id(response, correlation_id) + g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") + reason = _classify_http_reason(response.status_code) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + f"end-session {response.status_code}") error_handler.enqueue_error(error=error) - return _upload_rejection(_filename, LogUploadReasonCode.DISCONNECTED.value) + return _upload_rejection(_filename, reason) g_utils.logger.info(f"File {file_json['start_session']['metadata']['deviceFileName']} uploaded.") return str(file_json['start_session']['metadata']['deviceFileName']) Index: cloudsync/utils/helpers.py =================================================================== diff -u -rb2f7afce365e3044714e0e18505fed5ddefe7764 -rf26f09af1e10683fcbeb7fac051976ee0cd504f0 --- cloudsync/utils/helpers.py (.../helpers.py) (revision b2f7afce365e3044714e0e18505fed5ddefe7764) +++ cloudsync/utils/helpers.py (.../helpers.py) (revision f26f09af1e10683fcbeb7fac051976ee0cd504f0) @@ -183,19 +183,20 @@ def helpers_get_stored_token() -> Union[str, None]: - """ - Returns the stored token if it exists and has not expired. - Decodes the JWT payload to check the 'exp' claim against the current time. - Returns None if the token is expired or within 60 seconds of expiry, - forcing the caller to request a fresh token. + """Return the stored access token string, or None. - F10: a corrupt / truncated / unreadable token file MUST return None - rather than crash CS — token corruption is a recoverable state; CS - must stay alive to refresh the token. + F15 (server authority) — no client-side `exp` check. DCS's JwtBearer + middleware is the sole authority on token lifetime (default 5 min + ClockSkew tolerance). On 401/403 from a DCS call, callers refresh + via cert auth and retry once. This eliminates the wall-clock + dependency that caused refresh storms on devices with operator-set + or jumped system time (DVT-9 / multi-day jump test protocol). - :return: The token if found and valid, otherwise returns None + F10 corruption guard preserved: a corrupt / truncated / unreadable + token file MUST return None rather than crash CS. + + :return: The access token string if present, otherwise None. """ - data = None if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): return None try: @@ -210,42 +211,9 @@ if data is None: return None - access_token = data.get("access_token", None) - if access_token is None: - return None + return data.get("access_token", None) - # Check JWT expiry before returning the token - try: - # JWT is header.payload.signature — decode the payload (no verification needed, - # we just need the exp claim; signature is verified by DCS on each request) - payload_b64 = access_token.split('.')[1] - # Add padding if needed - padding = 4 - len(payload_b64) % 4 - if padding != 4: - payload_b64 += '=' * padding - payload = json.loads(base64.urlsafe_b64decode(payload_b64)) - exp = payload.get('exp', 0) - now = int(time()) - # F14b — clock-skew tolerance. A forward wall-clock step (operator - # `date` command, ntpd step) can make a freshly-issued token look - # "expired" per the device clock even though Keycloak considers it - # valid — causing a refresh storm where every request hits Keycloak - # anew. Give the stored token a 2h grace past its `exp` claim. If - # the token is genuinely expired, DCS will reject it on the next - # call with 401/403 and the refresh path kicks in; a slightly-late - # retry is cheap, a Keycloak refresh storm is not. See - # st14_phase2_fix_spec.md §F14b for the DVT-9 rationale. - TOKEN_EXPIRY_GRACE_S = 7200 - if now >= (exp + TOKEN_EXPIRY_GRACE_S): - g_utils.logger.info(f"Stored token expired beyond grace window (exp={exp}, now={now}, grace={TOKEN_EXPIRY_GRACE_S}s), will refresh") - return None - except Exception: - # If we can't decode, let the caller proceed and DCS will reject if invalid - pass - return access_token - - def helpers_read_config(path: str) -> dict: """ Read the configuration