Index: cloud_sync.py =================================================================== diff -u -r99d35f3d5898d50372e1745b96b393db40cd8394 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloud_sync.py (.../cloud_sync.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) +++ cloud_sync.py (.../cloud_sync.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -24,7 +24,7 @@ import threading -VERSION = "0.5.2" +VERSION = "0.5.3" arguments = sys.argv Index: cloudsync/busses/file_input_bus.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -8,6 +8,7 @@ from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler from cloudsync.handlers.uics_message import UICSMessage from cloudsync.utils.filesystem import check_readable +from cloudsync.utils.globals import CONFIG_DEVICE, CONFIG_DEVICE_HD_SERIAL class FileInputBus: @@ -59,14 +60,15 @@ continue try: - f = open(input_file_path) + with open(input_file_path) as f: + lines = f.readlines() except IOError as er: - self.logger.error('Opening input file error: {0}'.format(' '.join(str(er)))) + self.logger.error('Opening input file error: {0}'.format(str(er))) continue new_input_messages = [] - for line in f.readlines(): + for line in lines: message_parameters = line.strip().split(',') try: sequence_id = int(message_parameters[1]) @@ -75,6 +77,10 @@ continue if sequence_id > self.last_input_message_id: new_message = UICSMessage(line.strip(), self.g_config) + device_sn = self.g_config.get(CONFIG_DEVICE, {}).get(CONFIG_DEVICE_HD_SERIAL, "unregistered") + new_message.correlation_id = "cs-{}-{}-{}-{}".format( + device_sn, new_message.ID, new_message.sequence, + str(round(time() * 1000))) message_added_to_queue = self.message_handler.enqueue_message(new_message) if message_added_to_queue: new_input_messages.append((sequence_id, line.strip())) Index: cloudsync/busses/file_output_bus.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -84,4 +84,4 @@ f.write("{0}\n".format(message_body)) self.last_output_message_id += 1 except IOError as er: - self.logger.error('Opening and/or writing to output file error: {0}'.format(' '.join(str(er)))) + self.logger.error('Opening and/or writing to output file error: {0}'.format(str(er))) Index: cloudsync/handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u -r2738f6e0ccd52b3b7b8e217f361d23ec69b3adfb -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision 2738f6e0ccd52b3b7b8e217f361d23ec69b3adfb) +++ cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -11,7 +11,7 @@ from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error from cloudsync.utils.helpers import log_func -from cloudsync.utils.logging import LoggingConfig +from cloudsync.utils.logging import LoggingConfig, set_correlation_id, clear_correlation_id from cloudsync.utils.globals import * from cloudsync.common.enums import * from cloudsync.handlers.outgoing.handler_cs_to_mft import * @@ -39,10 +39,13 @@ while True: flag = self.event.wait() if flag: - req = self.queue.popleft() - self.handle_request(req) - if len(self.queue) == 0: - self.event.clear() + while len(self.queue) > 0: + req = self.queue.popleft() + try: + self.handle_request(req) + finally: + clear_correlation_id() + self.event.clear() def enqueue_request(self, req: NetworkRequest) -> bool: """ @@ -61,6 +64,10 @@ Called on each timer event. Logs the data currently in deque :return: None """ + _cid = req.correlation_id + _dsn = req.headers.get("device_sn", "") + set_correlation_id(_cid) + self.logger.info('[%s] Processing network request: %s', _cid, getattr(req.request_type, 'name', req.request_type)) # REGISTRATION MODE if req.request_type == NetworkRequestType.CS2MFT_REQ_REGISTRATION: @@ -74,7 +81,9 @@ data_source_id=req.g_config[CONFIG_KEBORMED][ CONFIG_KEBORMED_DIA_ORG_ID], headers=req.headers, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, + device_sn=_dsn) self.logger.debug("DRT Request registration resp: {0}".format(response)) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, @@ -142,7 +151,8 @@ access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, - client_secret=client_secret) + client_secret=client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: @@ -158,7 +168,8 @@ response = cmd_outgoing_set_device_state(url=device_state_url, access_token=access_token, device_state_json=device_state_json, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) if response is None: return @@ -173,7 +184,7 @@ self.logconf.set_dcs_flag(1) # Calculate the duration of the new log level log_level_duration = helpers_calculate_log_level_duration(response.json()) - log_level_duration = log_level_duration if log_level_duration else req.g_config[CONFIG_DEVICE][CONFIG_LOGS_LOG_LEVEL_DURATION] + log_level_duration = log_level_duration if log_level_duration else req.g_config[CONFIG_LOGS][CONFIG_LOGS_LOG_LEVEL_DURATION] self.logconf.set_log_duration(log_level_duration) # Update the log level self.logconf.set_log_level(new_log_level) @@ -214,7 +225,8 @@ access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, - client_secret=client_secret) + client_secret=client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is None: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, @@ -232,12 +244,13 @@ CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) if response is None: return - invalid_attributes = response.get("invalidAttributes", None) + invalid_attributes = response.get("invalidAttributes") or [] if len(invalid_attributes) > 0: self.logger.error( @@ -265,7 +278,8 @@ response = cmd_outgoing_check_if_patient_with_emr_id_exists(access_token=access_token, url=patient_with_emr_id_exists_url, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) # Step #2b - If patient with emr_id doesn't exist, create temporary patient @@ -277,7 +291,8 @@ elif response.status_code == NOT_FOUND: response = cmd_outgoing_create_temporary_patient(access_token=access_token, url=create_temporary_patient_url, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) if response is None: return patient_id = response.get("id", None) @@ -293,7 +308,8 @@ response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, access_token=access_token, associate=True, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) # Step #4 - Send treatment report association_time = int(round(time() * S_MS_CONVERSION_FACTOR)) @@ -320,7 +336,8 @@ response = cmd_outgoing_send_treatment_report(url=data_submission_url, access_token=access_token, treatment_log=treatment_log_json, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) if response is None: return @@ -347,7 +364,8 @@ response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, access_token=access_token, associate=False, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, @@ -369,7 +387,8 @@ access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, - client_secret=client_secret) + client_secret=client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: @@ -382,12 +401,13 @@ CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) if response is None: return - invalid_attributes = response.get("invalidAttributes", None) + invalid_attributes = response.get("invalidAttributes") or [] if len(invalid_attributes) > 0: self.logger.error( @@ -403,6 +423,7 @@ device_log_data['organizationId'] = organization_id # Step #3 - upload the device log file in chunks + # Duplicate detection relies on 409 at start-session (immediate, no wasted chunks). device_log_json = helpers_construct_device_log_json(device_log_data) @@ -414,7 +435,9 @@ token_refresher=lambda: self.get_valid_token( identity_url=identity_url, token_verification_url=token_verification_url, - client_secret=client_secret)) + client_secret=client_secret, + correlation_id=_cid, device_sn=_dsn), + correlation_id=_cid, device_sn=_dsn) if isinstance(upload_result, dict) and not upload_result.get("accepted"): # Rejection (e.g. 409 duplicate) — send 2010 with 3 params: filename, 0, reason_code @@ -425,7 +448,7 @@ self.output_channel.enqueue_message(message_body) elif isinstance(upload_result, str): # Success — send 2010 with 3 params: filename, 1 (success), 0 (no error) - self.logger.debug("Device log file uploaded: {upload_result}") + self.logger.debug(f"Device log file uploaded: {upload_result}") message_body = "{0},3,{1},1,0".format( OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, upload_result) @@ -456,7 +479,8 @@ access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, - client_secret=client_secret) + client_secret=client_secret, + correlation_id=_cid, device_sn=_dsn) if access_token is not None: @@ -469,12 +493,13 @@ CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=_cid, device_sn=_dsn) if response is None: return - invalid_attributes = response.get("invalidAttributes", None) + invalid_attributes = response.get("invalidAttributes") or [] if len(invalid_attributes) > 0: self.logger.error( @@ -501,10 +526,14 @@ token_refresher=lambda: self.get_valid_token( identity_url=identity_url, token_verification_url=token_verification_url, - client_secret=client_secret)) + client_secret=client_secret, + correlation_id=_cid, device_sn=_dsn), + correlation_id=_cid, device_sn=_dsn) - if cs_log_filename is not None: - self.logger.debug("CS log file uploaded: {cs_log_filename}") + if isinstance(cs_log_filename, dict) and not cs_log_filename.get("accepted"): + self.logger.info(f"CS log file rejected as duplicate: {cs_log_filename.get('filename')}") + elif isinstance(cs_log_filename, str): + self.logger.debug(f"CS log file uploaded: {cs_log_filename}") else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, @@ -520,7 +549,8 @@ else: g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) - def get_valid_token(self, identity_url, token_verification_url, client_secret): + def get_valid_token(self, identity_url, token_verification_url, client_secret, + correlation_id="", device_sn=""): access_token = helpers_get_stored_token() if access_token is None: @@ -530,19 +560,25 @@ save=True, url=identity_url, client_secret=client_secret, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=correlation_id, + device_sn=device_sn) else: response = cmd_outgoing_verify_token(url=token_verification_url, access_token=access_token, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=correlation_id, + device_sn=device_sn) if response is None or response.status_code != OK: self.logger.warning(f"Token verification failed (status={response.status_code if response else 'None'}), refreshing token") access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, url=identity_url, client_secret=client_secret, - error_handler=self.error_handler) + error_handler=self.error_handler, + correlation_id=correlation_id, + device_sn=device_sn) else: self.logger.info("Token verification succeeded") Index: cloudsync/handlers/error.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/error.py (.../error.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/error.py (.../error.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -4,6 +4,7 @@ class Error: def __init__(self, error_body: str): + self.correlation_id = "" error_components = error_body.split(',') if len(error_components) >= 3: @@ -28,7 +29,8 @@ "ID": self.ID, "size": self.size, "code": self.code, - "parameters": self.parameters + "parameters": self.parameters, + "correlation_id": self.correlation_id, } return str(data) Index: cloudsync/handlers/error_handler.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/error_handler.py (.../error_handler.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/error_handler.py (.../error_handler.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -47,13 +47,13 @@ def handle_error(self, error: Error): if InboundMessageIDs.mapped_str_value(error.ID) == InboundMessageIDs.UI2CS_ERROR: - self.logger.error('UI App Error {0}: {1}'.format(error.code, error.parameters)) + self.logger.error('[%s] UI App Error %s: %s', error.correlation_id, error.code, error.parameters) return # TODO Add specific UI error message handling when necessary if OutboundMessageIDs.mapped_str_value(error.ID) == OutboundMessageIDs.CS2UI_ERROR: - self.logger.error('CS App Error {0} - {1}: {2}'.format(error.code, - ErrorIDs.mapped_str_value(error.code), - error.parameters)) + self.logger.error('[%s] CS App Error %s - %s: %s', error.correlation_id, error.code, + ErrorIDs.mapped_str_value(error.code), + error.parameters) parameter_string = "" for parameter in error.parameters: parameter_string += "," Index: cloudsync/handlers/incoming/handler_mft_to_cs.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/incoming/handler_mft_to_cs.py (.../handler_mft_to_cs.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/incoming/handler_mft_to_cs.py (.../handler_mft_to_cs.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -6,6 +6,7 @@ from cloudsync.utils.helpers import * from cloudsync.utils.globals import * from cloudsync.utils.filesystem import check_writable, check_disk_space_mb +from cloudsync.utils.logging import get_correlation_id from cloudsync.common.enums import * from cloudsync.handlers.outgoing.handler_cs_to_mft import * from cloudsync.handlers.outgoing.handler_cs_to_dcs import * @@ -80,27 +81,34 @@ """ try: client_secret = g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + _cid = get_correlation_id() + _dsn = hd_serial access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, url=url_token, client_secret=client_secret, - error_handler=error_handler) + error_handler=error_handler, + correlation_id=_cid, + device_sn=_dsn) if access_token is not None: g_utils.logger.debug("Access token: {0}".format(access_token)) response = cmd_outgoing_validate_device(access_token=access_token, hd_serial_number=hd_serial, dg_serial_number=dg_serial, sw_version=sw_version, url=url_validate, - error_handler=error_handler) + error_handler=error_handler, + correlation_id=_cid, + device_sn=_dsn) if response is not None: g_utils.logger.debug("Response: {0}".format(response)) invalid_attributes = response.get("invalidAttributes", None) g_utils.logger.info("Invalid fields: {0}".format(invalid_attributes)) if invalid_attributes is not None: - json_resp = cmd_outgoing_validation_result(invalid_attributes, hd_serial, g_config, error_handler) + json_resp = cmd_outgoing_validation_result(invalid_attributes, hd_serial, g_config, error_handler, + correlation_id=_cid, device_sn=_dsn) g_utils.logger.debug("Validation result request response (DRT --> CS): {0}".format(json_resp)) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, Index: cloudsync/handlers/logs_handler.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -56,7 +56,7 @@ "checksum": helpers_sha256_checksum(log_file_path) } - g_utils.logger.debug("CS log data: {cs_log_data}") + g_utils.logger.debug(f"CS log data: {cs_log_data}") try: Index: cloudsync/handlers/network_request.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/network_request.py (.../network_request.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/network_request.py (.../network_request.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -6,12 +6,14 @@ class NetworkRequest: - def __init__(self, request_type: NetworkRequestType, url: str, payload: dict, method: str, g_config: dict): + def __init__(self, request_type: NetworkRequestType, url: str, payload: dict, method: str, g_config: dict, + correlation_id: str = ""): self.request_type = request_type self.url = url self.payload = payload self.method = method self.g_config = g_config + self.correlation_id = correlation_id self.headers = { "device_ip": self.g_config[CONFIG_DEVICE][CONFIG_DEVICE_IP], @@ -25,5 +27,6 @@ "headers": self.headers, "payload": self.payload, "method": self.method, + "correlation_id": self.correlation_id, } return str(data) Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -12,13 +12,35 @@ from cloudsync.handlers.error import Error +def _inject_tracing_headers(headers: dict, correlation_id: str = "", device_sn: str = "") -> dict: + """Add cross-system tracing headers. Non-functional: unknown headers are ignored by DCS.""" + if correlation_id: + headers["X-Correlation-ID"] = correlation_id + if device_sn: + headers["X-Device-Serial"] = device_sn + return headers + + +def _log_dcs_trace_id(response, correlation_id: str = ""): + """Log DCS Jaeger traceId from response headers on non-2xx responses.""" + if response is None: + return + if response.ok: + return + trace_id = response.headers.get("uber-trace-id", "") or response.headers.get("X-Correlation-ID", "") + if trace_id: + g_utils.logger.warning("[%s] DCS error %s — traceId=%s", correlation_id, response.status_code, trace_id) + + @log_func def cmd_outgoing_get_new_token_with_cert(path_certificate: str, path_private_key: str, save: bool, url: str, client_secret: str, - error_handler: ErrorHandler) -> str: + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> str: """ Obtains authentication token with device certificate & private key @@ -43,6 +65,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) cert_paths = (path_certificate, path_private_key) g_utils.logger.debug("Making request: {0}, {1}, {2}, {3}".format(url, headers, payload, cert_paths)) @@ -86,7 +109,9 @@ @log_func def cmd_outgoing_verify_token(url: str, access_token: str, - error_handler: ErrorHandler) -> requests.Response: + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> requests.Response: try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), @@ -95,6 +120,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) data = { "hdSerialNumber": "token-validation", @@ -132,7 +158,9 @@ dg_serial_number: str, sw_version: str, url: str, - error_handler: ErrorHandler) -> dict: + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> dict: """ Step 8. Validate device Step 9. Validate device response (list of invalid fields) @@ -151,6 +179,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) response = requests.post(url=url, headers=headers, @@ -195,7 +224,9 @@ def cmd_outgoing_set_device_state(url: str, access_token: str, device_state_json: dict, - error_handler: ErrorHandler) -> requests.Response: + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> requests.Response: """ Updates the backend with the current device state :param url: set device state URL @@ -211,6 +242,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) payload = device_state_json resp = requests.put(url=url, headers=headers, @@ -238,14 +270,17 @@ @log_func def cmd_outgoing_check_if_patient_with_emr_id_exists(access_token: str, url: str, - error_handler: ErrorHandler) -> requests.Response: + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> requests.Response: try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) response = requests.get(url=url, headers=headers, @@ -286,14 +321,17 @@ @log_func def cmd_outgoing_create_temporary_patient(access_token: str, url: str, - error_handler: ErrorHandler): + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = ""): try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) payload = {} @@ -338,7 +376,9 @@ def cmd_outgoing_set_patient_device_association(url: str, access_token: str, associate: bool, - error_handler: ErrorHandler) -> requests.Response: + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> requests.Response: """ Sets the status of the device & patient association :param url: set device state URL @@ -354,6 +394,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) payload = {} if associate: @@ -395,7 +436,9 @@ def cmd_outgoing_send_treatment_report(url: str, access_token: str, treatment_log: str, - error_handler: ErrorHandler) -> requests.Response: + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> requests.Response: """ Sends a treatment report to DCS :param url: set device state URL @@ -411,6 +454,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) payload = treatment_log @@ -437,14 +481,51 @@ return None @log_func +def cmd_outgoing_check_data_log_exists(base_url: str, + access_token: str, + serial_number: str, + device_file_name: str, + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> bool: + """ + Checks if a device log file already exists in DCS before uploading. + Returns True if exists (duplicate), False if not or on error (fail-open). + """ + url = base_url.rstrip("/") + "/api/device/data/log/exists" + params = {"fileName": device_file_name, "serialNumber": serial_number} + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + _inject_tracing_headers(headers, correlation_id, device_sn) + try: + response = requests.head(url=url, headers=headers, params=params, timeout=(30, 60)) + if response.status_code == 204: + g_utils.logger.info(f"File {device_file_name} already exists in DCS (duplicate).") + return True + elif response.status_code == 404: + return False + else: + g_utils.logger.warning(f"Unexpected status from log-exists check: {response.status_code}") + return False + except Exception as e: + g_utils.logger.warning(f"Data log exists check failed, proceeding with upload: {e}") + return False + + +@log_func def cmd_outgoing_upload_file_in_chunks( base_url: str, access_token: str, file_json: dict, error_handler: ErrorHandler, log_file_origin: str, chunk_size: int=2 * 1024 * 1024, retries: int=3, - token_refresher: callable=None ) -> Union[str, None]: + token_refresher: callable=None, + correlation_id: str = "", + device_sn: str = "" ) -> Union[str, None]: """ Uploads a large file in chunks using sessions and retries. @@ -481,6 +562,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) g_utils.logger.info(f"Starting upload session for {log_file_origin} log") g_utils.logger.debug(f"File upload payload (start-session): {start_session_payload}") @@ -493,7 +575,14 @@ timeout=(30, 60)) g_utils.logger.info(f"Start-session response: {response.status_code}") + + if response.status_code == CONFLICT: + device_file_name = str(file_json['start_session']['metadata']['deviceFileName']) + g_utils.logger.info(f"File {device_file_name} rejected as duplicate at start-session (409 Conflict).") + return {"accepted": False, "filename": device_file_name, "reason_code": LogUploadReasonCode.DUPLICATE.value} + if response.status_code != 200: + _log_dcs_trace_id(response, correlation_id) g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") raise Exception(f"Error while starting upload session: {response.status_code} - {response.text}") @@ -526,6 +615,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) with open(target_file, "rb") as f: file_content = f.read() @@ -539,12 +629,14 @@ # Calculate the number of chunks num_chunks = total_size // chunk_size + (total_size % chunk_size > 0) + all_chunks_ok = True for i in range(num_chunks): start_index = i * chunk_size end_index = min(start_index + chunk_size, total_size) chunk = base64_string[start_index:end_index] # Retry logic with counter and backoff time + chunk_uploaded = False retry_count = 0 while retry_count < retries: try: @@ -555,7 +647,6 @@ upload_chunk_payload['data'] = chunk upload_chunk_payload = json.dumps(upload_chunk_payload) - g_utils.logger.debug(f"File upload payload (upload-chunk) - chunk No {chunk_number}: {upload_chunk_payload}") response = requests.post(upload_chunk_url, @@ -564,20 +655,34 @@ timeout=(30, 60)) if response.status_code == 200: - chunk_number += 1 g_utils.logger.info(f"Uploaded chunk {chunk_number} of {num_chunks}") + chunk_number += 1 + chunk_uploaded = True break # Successful upload, break retry loop g_utils.logger.warning(f"Chunk {chunk_number}/{num_chunks} upload failed: {response.status_code} - {response.text[:500]}") + retry_count += 1 if retry_count < retries: - g_utils.logger.info(f"Retrying chunk upload in 5 seconds (attempt {retry_count + 1}/{retries})...") - sleep(5) # Implement backoff time between retries - retry_count += 1 + g_utils.logger.info(f"Retrying chunk upload in 5 seconds (attempt {retry_count}/{retries})...") + sleep(5) except Exception as e: - error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) - error_handler.enqueue_error(error=error) + retry_count += 1 + g_utils.logger.error(f"Chunk {chunk_number}/{num_chunks} exception (attempt {retry_count}/{retries}): {e}") + if retry_count < retries: + sleep(5) + if not chunk_uploaded: + g_utils.logger.error(f"Chunk {chunk_number} failed after {retries} retries, aborting upload") + all_chunks_ok = False + break + + if not all_chunks_ok: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, + f"Upload aborted: chunk {chunk_number} failed after {retries} retries") + error_handler.enqueue_error(error=error) + return None + except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) error_handler.enqueue_error(error=error) @@ -607,6 +712,7 @@ 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } + _inject_tracing_headers(headers, correlation_id, device_sn) try: end_session_payload = json.dumps(end_session_payload) g_utils.logger.info(f"Ending upload session (sessionId={session_id})") @@ -623,6 +729,7 @@ g_utils.logger.info(f"End-session response: {response.status_code}") if response.status_code != 200: + _log_dcs_trace_id(response, correlation_id) g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") raise Exception(f"Error while ending upload session: {response.status_code} - {response.text}") Index: cloudsync/handlers/outgoing/handler_cs_to_mft.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/outgoing/handler_cs_to_mft.py (.../handler_cs_to_mft.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/outgoing/handler_cs_to_mft.py (.../handler_cs_to_mft.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -9,6 +9,7 @@ from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error +from cloudsync.handlers.outgoing.handler_cs_to_dcs import _inject_tracing_headers from cloudsync.utils.globals import * from cloudsync.common.enums import * from cloudsync.utils.helpers import * @@ -22,7 +23,9 @@ manf_tool_base_url: str, data_source_id: int, headers: dict, - error_handler: ErrorHandler) -> requests.Response: + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = "") -> requests.Response: """ Initiates device registration with the device registration tool Device -> Manufacturing Tool @@ -46,6 +49,7 @@ "dgSerialNumber": dg_serial, "softwareVersion": sw_version, } + _inject_tracing_headers(headers, correlation_id, device_sn) resp = requests.post(url=url, data=data, headers=headers, @@ -73,7 +77,9 @@ def cmd_outgoing_validation_result(invalid_attributes: List[str], hd_serial: str, g_config: dict, - error_handler: ErrorHandler): + error_handler: ErrorHandler, + correlation_id: str = "", + device_sn: str = ""): """ Sends the validation POST to the manf tool :param invalid_attributes: The list of invalid attributes @@ -105,6 +111,7 @@ data = { "invalidAttributes": invalid_attributes, } + _inject_tracing_headers(headers, correlation_id, device_sn) g_utils.logger.debug("headers: {0}".format(headers)) g_utils.logger.debug("data: {0}".format(data)) resp = requests.post(url=url, Index: cloudsync/handlers/ui_cs_request_handler.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -8,6 +8,7 @@ from cloudsync.common.enums import * from cloudsync.utils.globals import * from cloudsync.utils.helpers import * +from cloudsync.utils.logging import set_correlation_id, clear_correlation_id import urllib.parse import os @@ -41,7 +42,10 @@ if flag: while len(self.queue) > 0: message = self.queue.popleft() - self.handle_message(message) + try: + self.handle_message(message) + finally: + clear_correlation_id() self.event.clear() def enqueue_message(self, message: UICSMessage) -> bool: @@ -56,8 +60,13 @@ else: return False + def _enqueue_error(self, error: Error, correlation_id: str = "") -> bool: + error.correlation_id = correlation_id + return self.error_handler.enqueue_error(error=error) + def handle_message(self, message: UICSMessage) -> None: - self.logger.info('UI2CS Message: {0}'.format(message)) + set_correlation_id(message.correlation_id) + self.logger.info('[%s] UI2CS Message: %s', message.correlation_id, message) message_data = message.timestamp + message.sequence + message.ID + message.size if len(message.parameters) > 0: @@ -74,7 +83,7 @@ message.sequence, message.CRC, str(message_calculated_crc8)) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) else: # REGISTRATION MODE @@ -88,7 +97,7 @@ error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, "invalid # of parameters for registration") - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) else: try: message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] = message.parameters[0] @@ -113,17 +122,18 @@ method='', g_config=message.g_config, success_message='CS2MFT_REQ_REGISTRATION request added to network ' - 'queue') + 'queue', + correlation_id=message.correlation_id) except IOError: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CONFIG_ERROR.value, "Error updating device config file") - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, str(e)) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) # OPERATION MODE @@ -143,7 +153,7 @@ error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_UNKNOWN_DEVICE_STATE_ERROR.value, "Unknown device state received from UI") - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] = device_state.value self.logger.info("Device state: {0}".format(device_state.name)) @@ -157,12 +167,13 @@ method='', g_config=message.g_config, success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network ' - 'queue') + 'queue', + correlation_id=message.correlation_id) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, str(e)) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) # SEND TREATMENT REPORT REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_TREATMENT_REPORT and \ @@ -190,13 +201,19 @@ method='', g_config=message.g_config, success_message='CS2DCS_REQ_SEND_TREATMENT_REPORT request added to network' - 'queue') + 'queue', + correlation_id=message.correlation_id) + else: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + f"Failed to read treatment log file: {message.parameters[0]}") + self._enqueue_error(error, message.correlation_id) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, str(e)) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) # DECOMMISSIONING REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_DECOMMISSION and \ @@ -212,7 +229,7 @@ error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_DECOMMISSION_ERROR.value, str(e)) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) # CHECK-IN REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_CHECKIN: @@ -227,7 +244,7 @@ error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_CHECKIN_ERROR.value, str(e)) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) # CS LOG RETENTION elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_LOG_RETENTION: @@ -243,12 +260,12 @@ error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_RETENTION_ERROR.value, str(e)) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_RETENTION_ERROR.value, "invalid # of parameters for log retention") - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) # ERROR MESSAGE RECEIVED FROM UI elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_ERROR: @@ -258,7 +275,7 @@ for parameter in message.parameters[1:]: error_body += ",{0}".format(parameter) error = Error(error_body=error_body) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) # UPLOAD DEVICE LOG TO CLOUD REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_UPLOAD_DEVICE_LOG and \ @@ -269,7 +286,7 @@ error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, "invalid # of parameters for file upload request") - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) else: try: @@ -297,10 +314,11 @@ payload=device_log_data, method='', g_config=message.g_config, - success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to network queue') + success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to network queue', + correlation_id=message.correlation_id) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, str(e)) - self.error_handler.enqueue_error(error=error) + self._enqueue_error(error, message.correlation_id) Index: cloudsync/handlers/uics_message.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/handlers/uics_message.py (.../uics_message.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/handlers/uics_message.py (.../uics_message.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -4,6 +4,7 @@ def __init__(self, message_body: str, g_config: dict): self.g_config = g_config + self.correlation_id = "" message_components = message_body.split(',') if len(message_components) >= 5: @@ -15,14 +16,14 @@ if int(message_components[4]) > 0: self.parameters = message_components[5:] else: - self.parameters = {} + self.parameters = [] else: self.timestamp = "0" self.sequence = "0" self.CRC = "0" self.ID = "0" self.size = "0" - self.parameters = {} + self.parameters = [] def __str__(self): data = { @@ -32,5 +33,6 @@ "ID": self.ID, "number_of_parameters": self.size, "parameters": self.parameters, + "correlation_id": self.correlation_id, } return str(data) Index: cloudsync/utils/helpers.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/utils/helpers.py (.../helpers.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/utils/helpers.py (.../helpers.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -305,10 +305,10 @@ treatment_data = helpers_read_treatment_report_template(TREATMENT_REPORT_TEMPLATE_PATH) try: - f = open(path) + with open(path) as f: + treatment_log_lines = f.readlines() counter = 0 - treatment_log_lines = f.readlines() while counter < len(treatment_log_lines): line = treatment_log_lines[counter].strip() @@ -471,7 +471,7 @@ def helpers_add_to_network_queue(network_request_handler, request_type, url, payload, method, g_config, - success_message): + success_message, correlation_id=""): cycle_duration = REACHABILITY_CYCLE_PAUSE total_cycles = REACHABILITY_CYCLES @@ -486,7 +486,8 @@ url=url, payload=payload, method=method, - g_config=g_config) + g_config=g_config, + correlation_id=correlation_id) request_added_to_queue = False max_attempts = 300 attempt = 0 @@ -608,10 +609,15 @@ def helpers_extract_device_log_metadata(log_file_name:str) -> Union[dict,None]: + # Format 1: YYYYMMDD_HHMMSS_SERIAL_SUBTYPE.u.LOGTYPE.gz (original expected) local_date_pattern = r"^\d{8}(?=_)" local_time_pattern = r"^(?:^.*?)_(\d{6})_" serial_number_pattern = r"^[a-zA-Z0-9]+_[a-zA-Z0-9]+_([a-zA-Z0-9]+)(?=_)" device_subtype_pattern = r"^[a-zA-Z0-9]+_[a-zA-Z0-9]+_[a-zA-Z0-9]+_([a-zA-Z0-9]+)(?=)" + + # Format 2: SERIAL_YYYYMMDD.u.LOGTYPE.gz (actual device format, e.g. DVT-BN_20260320.u.log.gz) + alt_date_pattern = r"_(\d{8})\.u\." + log_type_pattern = r"[a-zA-Z0-9]+\.u\.(\w+)(?=)" local_date_match = re.search(local_date_pattern, log_file_name) @@ -620,8 +626,15 @@ device_subtype_match = re.search(device_subtype_pattern, log_file_name) log_type_match = re.search(log_type_pattern, log_file_name) + # Fallback to Format 2 date if Format 1 didn't match + if not local_date_match: + alt_date_match = re.search(alt_date_pattern, log_file_name) + if alt_date_match: + local_date_match = alt_date_match + return { - "local_date": local_date_match.group(0) if local_date_match else 'unknown', + "local_date": local_date_match.group(1) if local_date_match and local_date_match.lastindex else + (local_date_match.group(0) if local_date_match else 'unknown'), "local_time": local_time_match.group(1) if local_time_match else 'unknown', "serial_number": serial_number_match.group(1) if serial_number_match else 'unknown', "device_sub_type": device_subtype_match.group(1) if device_subtype_match else 'unknown', @@ -674,6 +687,12 @@ cs_log_json['start_session']['macAddress'] = 'device-data' cs_log_json['start_session']['organizationId'] = cs_log_data['organizationId'] cs_log_json['start_session']['metadata']['dataType'] = 'cloud-sync-log-category' + # Prepend serial to CS log filename for DCS dedup uniqueness per device. + # Without this, two devices uploading cloudsync.log.03-27-2026 on the same day + # would collide in the filename-only dedup check (DataLogExistsByFileName). + serial = cs_log_data['serialNumber'] + if not file_name.startswith(serial): + file_name = f"{serial}_{file_name}" cs_log_json['start_session']['metadata']['deviceFileName'] = file_name cs_log_json['start_session']['metadata']['startTimestamp'] = start_of_day cs_log_json['start_session']['metadata']['endTimestamp'] = end_of_day @@ -706,39 +725,44 @@ # Timestamp for GeneratedAt (in miliseconds) local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) - # Calculate the UTC timestamp based on the local date and time the UI provided (in milliseconds) (NEED BETTER SOLUTION HERE) + # Derive deterministic timestamps from filename date (required for DCS dedup composite key). + # When date+time are both available, use the exact datetime. + # When only date is available (Format 2: SERIAL_YYYYMMDD.u.log.gz), use start-of-day. + # Fallback to upload time only if no date can be extracted at all. if extracted_metadata['local_date'] != 'unknown' and extracted_metadata['local_time'] != 'unknown': datetime_obj = datetime.strptime(f"{extracted_metadata['local_date']}{extracted_metadata['local_time']}", "%Y%m%d%H%M%S") - # Convert to UTC timestamp - ui_utc_timestamp = int(datetime_obj.timestamp()*1000) + ui_utc_timestamp = int(datetime_obj.replace(tzinfo=timezone.utc).timestamp() * 1000) + elif extracted_metadata['local_date'] != 'unknown': + date_obj = datetime.strptime(extracted_metadata['local_date'], "%Y%m%d") + ui_utc_timestamp = int(date_obj.replace(tzinfo=timezone.utc).timestamp() * 1000) else: ui_utc_timestamp = local_timestamp # Populate JSON object - if extracted_metadata is not None: - device_log_json['start_session']['reference'] = str(uuid.uuid4()) - device_log_json['start_session']['generatedAt'] = local_timestamp - device_log_json['start_session']['dataType'] = 'device-data' - device_log_json['start_session']['serialNumber'] = device_log_data['serialNumber'] - device_log_json['start_session']['macAddress'] = 'device-data' - device_log_json['start_session']['organizationId'] = device_log_data['organizationId'] - device_log_json['start_session']['metadata']['dataType'] = 'device-log-category' - device_log_json['start_session']['metadata']['deviceLogType'] = extracted_metadata['device_log_type'] - device_log_json['start_session']['metadata']['deviceSubType'] = extracted_metadata['device_sub_type'] - device_log_json['start_session']['metadata']['deviceFileName'] = file_name - device_log_json['start_session']['metadata']['startTimestamp'] = ui_utc_timestamp - device_log_json['start_session']['metadata']['endTimestamp'] = ui_utc_timestamp - device_log_json['end_session']['checksum'] = checksum - device_log_json['general']['file_size'] = file_size - device_log_json['general']['file_path'] = device_log_data['path'] + device_log_json['start_session']['reference'] = str(uuid.uuid4()) + device_log_json['start_session']['generatedAt'] = local_timestamp + device_log_json['start_session']['dataType'] = 'device-data' + device_log_json['start_session']['serialNumber'] = device_log_data['serialNumber'] + device_log_json['start_session']['macAddress'] = 'device-data' + device_log_json['start_session']['organizationId'] = device_log_data['organizationId'] + device_log_json['start_session']['metadata']['dataType'] = 'device-log-category' + device_log_json['start_session']['metadata']['deviceLogType'] = extracted_metadata['device_log_type'] + device_log_json['start_session']['metadata']['deviceSubType'] = extracted_metadata['device_sub_type'] + # Ensure deviceFileName includes serial for DCS dedup uniqueness per device. + # Real device logs already have serial in the name (e.g., DVT-BN_20260320.u.log.gz). + serial = device_log_data['serialNumber'] + if not file_name.startswith(serial): + file_name = f"{serial}_{file_name}" + device_log_json['start_session']['metadata']['deviceFileName'] = file_name + device_log_json['start_session']['metadata']['startTimestamp'] = ui_utc_timestamp + device_log_json['start_session']['metadata']['endTimestamp'] = ui_utc_timestamp + device_log_json['end_session']['checksum'] = checksum + device_log_json['general']['file_size'] = file_size + device_log_json['general']['file_path'] = device_log_data['path'] - return device_log_json + return device_log_json - else: - g_utils.logger.error('Device log file name does not match the pattern') - return None - def helpers_should_update_dcs_log_level(g_config: dict) -> bool: """ Returns True if the state of the log level should be @@ -755,7 +779,10 @@ def helpers_should_update_cs_log_level(response: dict, g_config: dict) -> bool: current_log_level:str = g_config[CONFIG_LOGS][CONFIG_LOGS_CURRENT_LOG_LEVEL].upper() - requested_log_level:str = response['logLevel'].upper() + requested_log_level = response.get('logLevel') + if requested_log_level is None: + return False + requested_log_level = requested_log_level.upper() if requested_log_level != current_log_level: g_utils.logger.debug(f"DCS requested to change the log level from {current_log_level} to {requested_log_level}") return True @@ -767,7 +794,7 @@ Returns a tuple with duration_in_seconds, start_timestamp, end_timestamp if a valid log level duration was given. Otherwise None """ - duration = int(response['logLevelDuration']) if response['logLevelDuration'] else 0 + duration = int(response.get('logLevelDuration', 0) or 0) start_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000) end_timestamp = start_timestamp + duration Index: cloudsync/utils/logging.py =================================================================== diff -u -r898cc02a49599b59d062b8f939d5587bdf0b8bd6 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/utils/logging.py (.../logging.py) (revision 898cc02a49599b59d062b8f939d5587bdf0b8bd6) +++ cloudsync/utils/logging.py (.../logging.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -8,6 +8,47 @@ from cloudsync.common.enums import * +# Thread-local storage for the active correlation ID. +# Set by request handlers before processing; read by CorrelationFilter +# so that every log line emitted during that request carries the ID. +_correlation_context = threading.local() + + +def set_correlation_id(correlation_id: str): + """Set the active correlation ID for the current thread.""" + _correlation_context.correlation_id = correlation_id + + +def get_correlation_id() -> str: + """Get the active correlation ID for the current thread, or empty string.""" + return getattr(_correlation_context, 'correlation_id', "") + + +def clear_correlation_id(): + """Clear the active correlation ID for the current thread.""" + _correlation_context.correlation_id = "" + + +class CorrelationFilter(logging.Filter): + """Injects correlation_id and device_sn into every log record. + + Reads the active correlation ID from thread-local storage so that + ALL log lines emitted during a correlated request carry the ID — + not just the lines that explicitly include it in the message text. + """ + + def __init__(self, get_device_sn=None): + super().__init__() + self._get_device_sn = get_device_sn or (lambda: "unregistered") + + def filter(self, record): + if not hasattr(record, 'correlation_id') or not record.correlation_id: + record.correlation_id = get_correlation_id() + if not hasattr(record, 'device_sn'): + record.device_sn = self._get_device_sn() + return True + + class LoggingConfig(metaclass=SingletonMeta): """ Encapsulates logging configuration and setup. @@ -207,11 +248,19 @@ ) cls.log_handler.suffix = "%m-%d-%Y" - # Add a formatter + # Add a formatter (includes correlation_id and device_sn for cross-system tracing) default_formatter = logging.Formatter( - '[%(asctime)s] %(levelname)s in %(module)s: %(message)s | {%(pathname)s:%(lineno)d}') + '[%(asctime)s] %(levelname)s [%(correlation_id)s] [%(device_sn)s] in %(module)s: %(message)s | {%(pathname)s:%(lineno)d}') cls.log_handler.setFormatter(default_formatter) + # Install correlation filter on root logger to provide defaults for correlation_id/device_sn + def _get_device_sn(): + if cls.g_config is not None: + return cls.g_config.get(CONFIG_DEVICE, {}).get(CONFIG_DEVICE_HD_SERIAL, "unregistered") + return "unregistered" + cls._correlation_filter = CorrelationFilter(get_device_sn=_get_device_sn) + cls.log_handler.addFilter(cls._correlation_filter) + # Set the custom handler as global handler cls.set_log_handler() # Set the g_utils logger Index: cloudsync/utils/watchdog.py =================================================================== diff -u -radec5be657a174b63971987fcabd492e58720712 -r3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f --- cloudsync/utils/watchdog.py (.../watchdog.py) (revision adec5be657a174b63971987fcabd492e58720712) +++ cloudsync/utils/watchdog.py (.../watchdog.py) (revision 3ecfe17da0e7fb115cc97ed14014d9c13aa87a8f) @@ -12,7 +12,6 @@ # Default sentinel file path — cs.py monitors this to trigger process restart -# On-device: /media/sd-card/cloudsync (stable across registration & operation modes) SENTINEL_PATH = "/media/sd-card/cloudsync/cloudsync_restart_sentinel"