"""Implementation of helper methods""" import os import shutil import json import hashlib import socket import re import base64 import uuid import subprocess import math import threading from datetime import * from time import time, sleep from typing import Union, Any from logging import Logger from cloudsync.utils.singleton import SingletonMeta from cloudsync.common.enums import * from cloudsync.handlers.network_request import NetworkRequest from cloudsync.utils.reachability import * from cloudsync.utils.globals import * class GUtils(metaclass=SingletonMeta): def __init__(self): self.logger = None self.reachability_provider = None def add_logger(self, logger: Logger): self.logger = logger def add_reachability_provider(self, reachability_provider: ReachabilityProvider): self.reachability_provider = reachability_provider g_utils = GUtils() class TreatmentLogParseError(Exception): """Raised by helpers_read_treatment_log_file for labelled failure modes. The first positional arg is the category, used by the 1007 handler to select a curated, comma-free, customer-readable wire-frame string. Internal cause (if any) is chained via `raise ... from e` and reaches cloudsync.log via the handler's `log.exception(...)` — never the wire. Categories: - "missing template" — treatment_report_template.json absent - "missing input file" — caller-supplied .txr path absent - "schema mismatch" — parse-loop failure on present input """ CRC_LIST = [ 0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46, 67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109, 134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168, 197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235, 61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19, 126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80, 187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149, 248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214, 122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84, 57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23, 252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210, 191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145, 71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105, 4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42, 193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239, 130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172 ] def helpers_is_int(val: str) -> bool: """ Determines if the value can be converted to an int :param val: the value in string form :return: True if can be converted to an int, false otherwise """ try: int(val) return True except ValueError: return False def helpers_is_float(val: str) -> bool: """ Determines if the value can be converted to a float :param val: the value in string form :return: True if can be converted to a float, false otherwise """ try: float(val) except (ValueError, TypeError): return False return True def helpers_try_numeric(val: str) -> Union[int, float, str]: """ Tries to convert the value to numeric. If it's not possible leaves it as a string :param val: the value to convert :return: the converted value if possible, otherwise it is returned as is """ if helpers_is_int(val): return int(val) elif helpers_is_float(val): try: f = float(val) except Exception: g_utils.logger.warning(f"Unexpected conversion failure for: '{val}' — replacing with 0") return 0 if math.isinf(f) or math.isnan(f): # DCS recognizes -0.0001 as the non-finite sentinel across clinical # telemetry fields (all physically non-negative on-device, so any # negative value is unambiguously a sentinel). Single value covers # -inf / +inf / NaN. g_utils.logger.warning(f"Non-finite float value encountered: '{val}' — replacing with -0.0001") return -0.0001 return f else: return val def helpers_parse_treatment_log(path: str) -> dict: """ Converts a treatment.log file to a python dictionary The treatment log needs to be formatted better. Until then, this is a (non-ideal) way to read it. :param path: the path to the treatment log file :return: the parsed treatment log file """ if not os.path.exists(path): g_utils.logger.warning("Path does not exist: %s", path) return {} result = {} with open(path, 'r') as f: lines = f.readlines() group = "NoGroup" for line_ in lines: line = line_.replace("\n", "") if "[" in line and "]" in line: group = line.split("[")[1].split("]")[0] if group not in result: result[group] = {} else: if group in ["Title", "Treatment Parameters", "Post-Treatment Data", "Extra"]: tokens = line.split(",") if len(tokens) > 1: subgroup = tokens[0] result[group][subgroup] = {} result[group][subgroup]["value"] = helpers_try_numeric(tokens[1]) if len(tokens) > 2: if tokens[2] == "": result[group][subgroup]["units"] = None else: result[group][subgroup]["units"] = tokens[2] elif group in ["Treatment Data", "Treatment Alarms", "Treatment Events"]: tokens = line.split(",") tokens_converted = [] for token in tokens: tokens_converted.append(helpers_try_numeric(token)) result[group]["data"] = result[group].get("data", []) + [tokens_converted] return result def helpers_device_state_to_cloud_state(hd_mode: HDOpModes, hd_sub_mode: HDOpSubModes) -> DeviceStates: """ Inactive Not OK - N/A - HD f/w will not know active vs. inactive - UI or cloud will have to maintain assigned tenant, active/inactive and Ok/Not OK while inactive Inactive OK - N/A Active OK - N/A Active Ready - mode is 3 (standby) and sub-mode is 1 (wait for treatment) Active In Treatment - mode between 4 and 7 (treatment params .. post treatment) Active Not Ready - mode/sub-mode is anything other than ready, in-treatment, or not OK Active Not OK - mode is 0 (fault) Decommissioned - N/A - HD f/w will not know if system is decommissioned """ if hd_mode == HDOpModes.MODE_STAN: return DeviceStates.ACTIVE_READY if (hd_mode == HDOpModes.MODE_TPAR) or (hd_mode == HDOpModes.MODE_PRET) or (hd_mode == HDOpModes.MODE_TREA) or ( hd_mode == HDOpModes.MODE_POST): return DeviceStates.ACTIVE_IN_TREATMENT if hd_mode == HDOpModes.MODE_FAUL: return DeviceStates.ACTIVE_NOT_OK return DeviceStates.ACTIVE_NOT_READY def helpers_get_stored_token() -> Union[str, None]: """Return the stored access token string, or None. No client-side `exp` check. DCS's JwtBearer middleware is the sole authority on token lifetime (default 5 min ClockSkew tolerance). On 401/403 from a DCS call, callers refresh via cert auth and retry once. This eliminates the wall-clock dependency that causes refresh storms on devices with operator-set or jumped system time. Corruption guard: a corrupt / truncated / unreadable token file MUST return None rather than crash CS. :return: The access token string if present, otherwise None. """ if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): return None try: with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: data = json.load(f) except (json.JSONDecodeError, IOError, OSError, UnicodeDecodeError) as e: if g_utils.logger is not None: g_utils.logger.warning( "Stored access-token file is corrupt or unreadable; forcing refresh: %s", e) return None if data is None: return None return data.get("access_token", None) def helpers_read_config(path: str) -> dict: """ Read the configuration :param path: the path to the configuration :return: the loaded configuration """ if os.path.exists(path): with open(path, 'r') as f: config = json.load(f) return config else: g_utils.logger.error("Operation configuration file does not exist: {0}".format(path)) raise FileNotFoundError(f"Operation configuration file does not exist: {path}") def helpers_write_config(folder_path: str, file_path: str, config: dict) -> None: """ Writes the config to the provided path If folder_path is provided, it first checks if the folder exists and creates it if it doesn't :param folder_path: the path for the config folder :param file_path: the path where the config json will be written :param config: the config dictionary :return: None """ if folder_path is not None: if not os.path.exists(folder_path): os.makedirs(folder_path) with open(file_path, 'w') as f: json.dump(config, f, indent=4) def helpers_read_access_token(path: str) -> dict: """ Reads the access token json file, returns it as a dict. On any read failure (corrupt JSON, IO error, invalid encoding) return an empty dict instead of raising. An unreadable token file must not crash CS — the caller will treat {} the same as "no cached token" and obtain a fresh one from Keycloak. :param path: The path to the access token json file :return: parsed dict, or {} if file missing / unreadable / corrupt """ data = {} if not os.path.exists(path): return data try: with open(path, 'r', encoding="utf-8-sig") as f: data = json.load(f) except (json.JSONDecodeError, IOError, OSError, UnicodeDecodeError) as e: if g_utils.logger is not None: g_utils.logger.warning( "Access-token file at %s is corrupt or unreadable: %s", path, e) return {} return data def helpers_read_treatment_report_template(path: str) -> dict: """ Read the treatment report template :param path: the path to the template :return: the loaded template """ if os.path.exists(path): with open(path, 'r') as f: template = json.load(f) return template else: g_utils.logger.error( "Configuration file does not exist: {0}".format(path)) return {} def log_func(func): """ Log the function and the parameters passed to it @param func: The decorated function @return: The wrapper function """ def _wrapper(*args, **kwargs): g_utils.logger.debug("Calling {0} args: {1} kwargs: {2}".format(func.__name__, tuple(args), kwargs)) return func(*args, **kwargs) return _wrapper def helpers_file_to_byte_array(file_path: str) -> Union[str, None]: try: with open(file_path, "rb") as f: # Read the entire file in binary mode file_bytes = f.read() return base64.b64encode(file_bytes).decode('utf-8') except FileNotFoundError as e: g_utils.logger.error(f"Device log file not found: {e}") return None except Exception as e: g_utils.logger.error(f"Error reading device log file: {e}") return None def helpers_get_file_size(file_path: str) -> Union[int, None]: try: return os.path.getsize(file_path) except OSError as e: g_utils.logger.error(f'Error getting file size: {e}') return None def helpers_read_treatment_log_file(path: str): # Preflight: both the template and the caller-supplied .txr must exist # before we touch the parser. Without this, missing inputs produce a # KeyError / IndexError / TypeError from deep in the parse loop that # the 1007 handler cannot translate into a customer-readable error. if not os.path.exists(TREATMENT_REPORT_TEMPLATE_PATH): raise TreatmentLogParseError("missing template") if not os.path.exists(path): raise TreatmentLogParseError("missing input file") treatment_data = helpers_read_treatment_report_template(TREATMENT_REPORT_TEMPLATE_PATH) try: with open(path) as f: treatment_log_lines = f.readlines() counter = 0 while counter < len(treatment_log_lines): line = treatment_log_lines[counter].strip() if line.startswith(SECTION_START_CHARACTER) and line.endswith(SECTION_STOP_CHARACTER) and counter < ( len(treatment_log_lines) - 2): section = line section_lines = [] counter += 1 section_start_counter = counter line = treatment_log_lines[counter].strip() while not (line.startswith(SECTION_START_CHARACTER) and line.endswith( SECTION_STOP_CHARACTER)) and counter < len(treatment_log_lines) - 1: section_lines.append(line) counter += 1 line = treatment_log_lines[counter].strip() if len(section_lines) > 0: if section == TREATMENT_DATA: for data_line in section_lines: data_components = data_line.split(',') data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "bloodFlowRate": helpers_try_numeric(data_components[1]), "dialysateFlowRate": helpers_try_numeric(data_components[2]), "ultrafiltrationRate": helpers_try_numeric(data_components[3]), "arterialPressure": helpers_try_numeric(data_components[4]), "venousPressure": helpers_try_numeric(data_components[5]), "systolic": helpers_try_numeric(data_components[6]), "diastolic": helpers_try_numeric(data_components[7]), "heartRate": helpers_try_numeric(data_components[8]) } treatment_data['data']['treatment']['data'].append(data_record) elif section == TREATMENT_ALARMS: for data_line in section_lines: data_components = data_line.split(',') parameters = [] if len(data_components) > 2: parameters = data_components[2:len(data_components)] data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "title": data_components[1], "parameters": parameters } treatment_data['data']['treatment']['alarms'].append(data_record) elif section == TREATMENT_EVENTS: for data_line in section_lines: data_components = data_line.split(',') parameters = [] if len(data_components) > 2: parameters = data_components[2:len(data_components)] data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "title": data_components[1], "parameters": parameters } treatment_data['data']['treatment']['events'].append(data_record) else: counter = section_start_counter else: if line.startswith(TREATMENT_CODE): treatment_data['data']['treatment']['treatmentCode'] = line.split(',')[1] treatment_data['reference'] = line.split(',')[1] elif line.startswith(PATIENT_ID): treatment_data['data']['patient']['id'] = line.split(',')[1] elif line.startswith(TREATMENT_DURATION): treatment_data['data']['treatment']['parameters']['treatmentDuration'] = int(line.split(',')[1]) elif line.startswith(BLOOD_FLOW_RATE): treatment_data['data']['treatment']['parameters']['bloodFlowRate'] = int(line.split(',')[1]) elif line.startswith(DIALYSATE_FLOW_RATE): treatment_data['data']['treatment']['parameters']['dialysateFlowRate'] = int(line.split(',')[1]) elif line.startswith(ACID_CONCENTRATE_TYPE): treatment_data['data']['treatment']['parameters']['dialysate']['acidCode'] = line.split(',')[1] elif line.startswith(BICARBONATE_CONCENTRATE_TYPE): treatment_data['data']['treatment']['parameters']['dialysate']['bicarbCode'] = line.split(',')[1] elif line.startswith(POTASSIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['K'] = float(line.split(',')[1]) elif line.startswith(CALCIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['Ca'] = float(line.split(',')[1]) elif line.startswith(BICARBONATE_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['HCO3'] = float(line.split(',')[1]) elif line.startswith(SODIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['Na'] = float(line.split(',')[1]) elif line.startswith(DIALYSATE_TEMPERATURE): treatment_data['data']['treatment']['parameters']['dialysateTemp'] = float(line.split(',')[1]) elif line.startswith(DIALYZER_TYPE): treatment_data['data']['treatment']['parameters']['dialyzerModel'] = line.split(',')[1] elif line.startswith(HEPARIN_TYPE): treatment_data['data']['treatment']['parameters']['heparinType'] = line.split(',')[1] elif line.startswith(HEPARIN_CONCENTRATION): treatment_data['data']['treatment']['parameters']['heparinConcentration'] = line.split(',')[1] elif line.startswith(HEPARIN_BOLUS_VOLUME): treatment_data['data']['treatment']['parameters']['heparinBolus'] = line.split(',')[1] elif line.startswith(HEPARIN_DISPENSE_RATE): treatment_data['data']['treatment']['parameters']['heparinRate'] = line.split(',')[1] elif line.startswith(HEPARIN_STOP): treatment_data['data']['treatment']['parameters']['heparinStopBeforeTreatmentEnd'] = line.split(',')[1] elif line.startswith(TREATMENT_START_DATE_TIME): element = datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") timestamp = datetime.timestamp(element) treatment_data['data']['treatment']['time']['start'] = int(timestamp) * S_MS_CONVERSION_FACTOR elif line.startswith(TREATMENT_END_DATE_TIME): element = datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") timestamp = datetime.timestamp(element) treatment_data['data']['treatment']['time']['end'] = int(timestamp) * S_MS_CONVERSION_FACTOR elif line.startswith(ACTUAL_TREATMENT_DURATION): treatment_data['data']['treatment']['time']['treatmentDuration'] = int(line.split(',')[1]) elif line.startswith(DIALYSATE_VOLUME_USED): treatment_data['data']['deviceTreatmentData']['dialysateVolumeUsed'] = float(line.split(',')[1]) elif line.startswith(PRESCRIBED_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(TARGET_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(ACTUAL_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(PRESCRIBED_UF_RATE): treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(TARGET_UF_RATE): treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(ACTUAL_UF_RATE): treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(SALINE_BOLUS_VOLUME): treatment_data['data']['deviceTreatmentData']['salineBolusVolumeGiven'] = float(line.split(',')[1]) elif line.startswith(HEPARIN_DELIVERED_VOLUME): treatment_data['data']['deviceTreatmentData']['heparinDelivered'] = line.split(',')[1] elif line.startswith(WATER_SAMPLE_TEST_RESULT): treatment_data['data']['treatment']['treatmentPreparation']['totalChlorine'] = line.split(',')[1] # new treatment fields elif line.startswith(TARGET_WEIGHT): treatment_data['data']['treatment']['time']['targetWeight'] = "" elif line.startswith(TOTAL_CHLORINE): treatment_data['data']['treatment']['treatmentPreparation']['totalChlorine'] = "" elif line.startswith(PH): treatment_data['data']['treatment']['treatmentPreparation']['ph'] = "" elif line.startswith(CONDUCTIVITY): treatment_data['data']['treatment']['treatmentPreparation']['conductivity'] = "" elif line.startswith(MACHINE_WIPED_DOWN): treatment_data['data']['diagnostics']['machineWipedDown'] = "" elif line.startswith(FILTER_LIFE): treatment_data['data']['diagnostics']['filterLife'] = "" elif line.startswith(LAST_CHEMICAL_DISINFECTION): treatment_data['data']['diagnostics']['lastChemicalDisinfection'] = "" elif line.startswith(LAST_HEAT_DISINFECTION): treatment_data['data']['diagnostics']['lastHeatDisinfection'] = "" counter += 1 return treatment_data except IOError as er: # File present at preflight but unreadable at open (permissions, # mid-read deletion, etc.). Map to the same wire-frame category # the caller dispatches for missing-input. raise TreatmentLogParseError("missing input file") from er except (KeyError, IndexError, TypeError, ValueError) as e: # Parse loop encountered structurally-invalid content. The 1007 # handler maps "schema mismatch" to a customer-readable string; # full Python traceback reaches cloudsync.log via log.exception. raise TreatmentLogParseError("schema mismatch") from e # Three-state backpressure. # These helpers USED to busy-wait 30 s and silently swallow the failure on # queue saturation. Now they return a typed outcome so every caller can # decide the right remediation (CS2UI_ERROR, 2010 rejection, retry-later…). # # Outcome contract: # ("queued", None) — successfully enqueued # ("full", "") — queue saturated after max_wait_s # ("failed", "") — pre-condition failed (no network, # unexpected exception, etc.) # # Default max_wait_s is 5 s (was 30 s implicitly via 300 × 0.1 s). The # handler thread is released sooner so upstream back-pressure propagates. _F4_MAX_WAIT_S_DEFAULT = 5 _F4_POLL_INTERVAL_S = 0.1 # Rolling counter of "full" events used by the watchdog to escalate # to sentinel when queue saturation is chronic rather than transient. _F4_QUEUE_FULL_EVENTS = [] _F4_QUEUE_FULL_WINDOW_S = 300 _F4_QUEUE_FULL_LOCK = threading.Lock() def _f4_record_queue_full(): """Record a 'queue saturated' event for watchdog consumption.""" now = time() with _F4_QUEUE_FULL_LOCK: _F4_QUEUE_FULL_EVENTS.append(now) # Trim events older than the window. cutoff = now - _F4_QUEUE_FULL_WINDOW_S while _F4_QUEUE_FULL_EVENTS and _F4_QUEUE_FULL_EVENTS[0] < cutoff: _F4_QUEUE_FULL_EVENTS.pop(0) def helpers_queue_full_event_count(window_s=_F4_QUEUE_FULL_WINDOW_S): """Return the number of queue-saturation events in the last *window_s* seconds. Consumed by the watchdog to escalate chronic saturation to sentinel independently of any individual thread appearing dead. """ now = time() cutoff = now - window_s with _F4_QUEUE_FULL_LOCK: while _F4_QUEUE_FULL_EVENTS and _F4_QUEUE_FULL_EVENTS[0] < cutoff: _F4_QUEUE_FULL_EVENTS.pop(0) return len(_F4_QUEUE_FULL_EVENTS) def helpers_add_to_network_queue(network_request_handler, request_type, url, payload, method, g_config, success_message, correlation_id="", max_wait_s=_F4_MAX_WAIT_S_DEFAULT): """Enqueue a network request with explicit backpressure reporting. :return: ``("queued", None)`` on success, ``("full", reason)`` when the queue is saturated for the full ``max_wait_s`` window, or ``("failed", reason)`` when a pre-condition blocks enqueue (no internet, unexpected exception). Callers MUST capture the return value and act on non-``queued`` outcomes — surface a CS2UI_ERROR (or upstream-visible signal) on "full"/"failed" so the caller can observe and retry. """ # Reachability pre-check. Retain the same cycle model (fast ramp-up # during early boot when reachability probe hasn't fired yet) but # surface the definitive "no internet" outcome to the caller. cycle_duration = REACHABILITY_CYCLE_PAUSE total_cycles = REACHABILITY_CYCLES cycle = 0 while (not g_utils.reachability_provider.reachability) and (cycle < total_cycles): sleep(cycle_duration) cycle += 1 if not g_utils.reachability_provider.reachability: reason = "Internet DOWN: Network request {0} couldn't be processed".format(request_type.name) g_utils.logger.warning(reason) return ("failed", reason) try: r = NetworkRequest(request_type=request_type, url=url, payload=payload, method=method, g_config=g_config, correlation_id=correlation_id) except Exception as exc: reason = "NetworkRequest construction failed for {0}: {1}".format( request_type.name, exc) g_utils.logger.error(reason) return ("failed", reason) max_attempts = max(1, int(max_wait_s / _F4_POLL_INTERVAL_S)) attempt = 0 request_added_to_queue = False while not request_added_to_queue and attempt < max_attempts: try: request_added_to_queue = network_request_handler.enqueue_request(r) except Exception as exc: reason = "enqueue_request raised for {0}: {1}".format( request_type.name, exc) g_utils.logger.error(reason) return ("failed", reason) if not request_added_to_queue: sleep(_F4_POLL_INTERVAL_S) attempt += 1 if request_added_to_queue: g_utils.logger.info(success_message) return ("queued", None) reason = "Network queue saturated for {0}s; request {1} not accepted".format( max_wait_s, request_type.name) g_utils.logger.error(reason) _f4_record_queue_full() return ("full", reason) def helpers_add_to_output_channel(output_channel, message_body, success_message, max_wait_s=_F4_MAX_WAIT_S_DEFAULT): """Enqueue an outbound UI-bus message with explicit backpressure reporting. :return: ``("queued", None)`` on success, ``("full", reason)`` when the bus is saturated, or ``("failed", reason)`` on exception. Callers MUST capture the return value. For heartbeat/liveness messages, ``"full"`` is typically acceptable (drop-oldest semantics — recent state is more useful than a stale queued copy). For treatment / error messages, callers SHOULD surface the failure. """ max_attempts = max(1, int(max_wait_s / _F4_POLL_INTERVAL_S)) attempt = 0 message_added = False while not message_added and attempt < max_attempts: try: message_added = output_channel.enqueue_message(message_body) except Exception as exc: reason = "output_channel.enqueue_message raised: {0}".format(exc) g_utils.logger.error(reason) return ("failed", reason) if not message_added: sleep(_F4_POLL_INTERVAL_S) attempt += 1 if message_added: g_utils.logger.info(success_message) return ("queued", None) reason = "Output channel saturated for {0}s; message not accepted".format(max_wait_s) g_utils.logger.error(reason) _f4_record_queue_full() return ("full", reason) def helpers_sha256_checksum(file_path: str) -> str: """ Returns the SHA256 of the file's raw bytes. Strict path-only contract. Callers that need to hash a literal string must use ``helpers_sha256_string`` explicitly. @param file_path: path to an existing regular file @return:: hex-digest SHA256 of file bytes @raises TypeError: if file_path is not a str @raises FileNotFoundError: if file_path does not resolve to a file (operationally distinguishes file-missing from SHA mismatch in the upload pipeline) """ if not isinstance(file_path, str): raise TypeError( "helpers_sha256_checksum expects str, got {0}".format(type(file_path).__name__) ) if not os.path.isfile(file_path): raise FileNotFoundError(file_path) checksum = hashlib.sha256() with open(file_path, "rb") as f: # Read and update hash in blocks of 4K for byte_block in iter(lambda: f.read(4096), b""): checksum.update(byte_block) return checksum.hexdigest() def helpers_sha256_string(data: str) -> str: """ Returns the SHA256 of a literal string (UTF-8-encoded). Companion to ``helpers_sha256_checksum``: that function is strict path-only. Use this one for protocol-payload integrity (e.g. treatment-report JSON) where the input is intentionally a string, not a file path. @param data: the string to hash @return:: hex-digest SHA256 of data.encode('utf-8') @raises TypeError: if data is not a str """ if not isinstance(data, str): raise TypeError( "helpers_sha256_string expects str, got {0}".format(type(data).__name__) ) return hashlib.sha256(data.encode("utf-8")).hexdigest() def helpers_crc8(message_list): """ Returns the calculated crc from a message list @param message_list: is a list of integer numbers containing the message @return:: integer containing an unsigned byte """ crc = 0 for byte in message_list: unsigned_byte = byte ^ crc crc = CRC_LIST[unsigned_byte] return crc def helpers_get_ip_address(): hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) return ip_address def helpers_decommission_device(): parent_folder = DECOMMISSION_CS_PATH subfolders_to_delete = DECOMMISSION_FOLDERS for subfolder in subfolders_to_delete: path_to_delete = os.path.join(parent_folder, subfolder) if os.path.exists(path_to_delete) and os.path.isdir(path_to_delete): shutil.rmtree(path_to_delete) else: raise FileNotFoundError( f"{path_to_delete} not found or not a directory!") # LOGGING SPECIFIC HELPERS def helpers_read_device_log_template(path: str) -> dict: """ Read the device log template :param path: the path to the template :return: the loaded template """ if os.path.exists(path): with open(path, 'r') as f: template = json.load(f) return template else: g_utils.logger.error( "Configuration file does not exist: {0}".format(path)) return {} def helpers_read_cs_log_template(path: str) -> dict: """ Read the cs log template :param path: the path to the template :return: the loaded template """ if os.path.exists(path): with open(path, 'r') as f: template = json.load(f) return template else: g_utils.logger.error( "Configuration file does not exist: {0}".format(path)) return {} def helpers_extract_device_log_metadata(log_file_name:str) -> Union[dict,None]: """ Extract metadata from device log filenames by parsing right-to-left. Device log naming convention: {serial}[_{LABEL}_]{YYYYMMDD}.{xxx}.{yyy}.{zzz} Where: - {serial}: variable format (NOT extracted — use CS-known value) - [_{LABEL}_]: optional label like BOOTPOST - {YYYYMMDD}: date (8 digits) - {xxx}: arbitrary identifier (u, u1, u2, anything) - {yyy}: file type (log, err, etc.) - {zzz}: compression (gz) Also supports Format 1: YYYYMMDD_HHMMSS_SERIAL_SUBTYPE.{xxx}.{yyy}.{zzz} Parsing strategy: right-to-left from the triple-dot extension, avoiding ambiguity from _ appearing in both serials and labels. """ # Step 1: Strip extension from the right # Try triple extension .{xxx}.{yyy}.{zzz} first (e.g., .u1.log.gz) # Fall back to double extension .{xxx}.{yyy} (e.g., .u.DeviceLog) triple_ext_match = re.search(r'\.([^.]+)\.([^.]+)\.([^.]+)$', log_file_name) double_ext_match = re.search(r'\.([^.]+)\.([^.]+)$', log_file_name) if triple_ext_match: device_log_type = triple_ext_match.group(2) # {yyy} = log, err, etc. stem = log_file_name[:triple_ext_match.start()] elif double_ext_match: device_log_type = double_ext_match.group(2) # {yyy} = DeviceLog, etc. stem = log_file_name[:double_ext_match.start()] else: return { "local_date": "unknown", "local_time": "unknown", "serial_number": "unknown", "device_sub_type": "unknown", "device_log_type": "unknown" } # Step 2: Extract date from the stem # Format 1: starts with YYYYMMDD_HHMMSS (e.g., 20260320_120000_...) format1_match = re.match(r'^(\d{8})_(\d{6})_', stem) if format1_match: local_date = format1_match.group(1) local_time = format1_match.group(2) # Format 1 also has serial and subtype after the time remainder = stem[format1_match.end():] parts = remainder.split('_', 1) serial_number = parts[0] if parts else 'unknown' device_sub_type = parts[1] if len(parts) > 1 else 'unknown' return { "local_date": local_date, "local_time": local_time, "serial_number": serial_number, "device_sub_type": device_sub_type, "device_log_type": device_log_type } # Format 2: date is the last 8 digits before the triple extension # e.g., DVT-BN_20260320 or DVT-BN_BOOTPOST_20260320 date_match = re.search(r'(\d{8})$', stem) local_date = date_match.group(1) if date_match else 'unknown' return { "local_date": local_date, "local_time": "unknown", "serial_number": "unknown", "device_sub_type": "unknown", "device_log_type": device_log_type } def helpers_construct_cs_log_json(cs_log_data: dict): """ Constructs the payload for cs log file uploading :param path: the path to the log file :returns: the json payload to be uploaded """ cs_log_json = helpers_read_cs_log_template(LOG_UPLOAD_TEMPLATE_PATH) # Stream the checksum from disk to avoid loading the whole file + # base64-encoding it just to produce a byte array to hash. On ~300 MB # CS logs the previous double-buffer pattern peaked at 4/3 × file size # and combined with the same pattern inside # cmd_outgoing_upload_file_in_chunks drove OOM-kill events on the # 2 GB i.MX8MMini device. checksum = helpers_sha256_checksum(cs_log_data['path']) # Get file size file_size = helpers_get_file_size(cs_log_data['path']) # Get the filename file_name = os.path.basename(cs_log_data['path']) # Completion and generation timestamp from CS (in miliseconds) local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) # Start and End timestamps calculation # Parse the date from the rotated log filename (suffix format: MM-DD-YYYY) # to ensure timestamps match the actual log date, not the upload date. # This prevents mismatched timestamps when uploading backlogged files. log_date = None parts = file_name.rsplit('.', 1) if len(parts) == 2: try: log_date = datetime.strptime(parts[1], "%m-%d-%Y").replace(tzinfo=timezone.utc) except ValueError: pass if log_date is None: log_date = datetime.now(timezone.utc) start_of_day = int(log_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()*1000) end_of_day = int(log_date.replace(hour=23, minute=59, second=59, microsecond=999999).timestamp()*1000) # Populate JSON object cs_log_json['start_session']['reference'] = str(uuid.uuid4()) cs_log_json['start_session']['generatedAt'] = local_timestamp cs_log_json['start_session']['dataType'] = 'device-data' cs_log_json['start_session']['serialNumber'] = cs_log_data['serialNumber'] cs_log_json['start_session']['macAddress'] = 'device-data' cs_log_json['start_session']['organizationId'] = cs_log_data['organizationId'] cs_log_json['start_session']['metadata']['dataType'] = 'cloud-sync-log-category' cs_log_json['start_session']['metadata']['deviceLogType'] = 'cloudsync' cs_log_json['start_session']['metadata']['deviceSubType'] = 'log' # Prepend serial to CS log filename for DCS dedup uniqueness per device. # Without this, two devices uploading cloudsync.log.03-27-2026 on the same day # would collide in the filename-only dedup check (DataLogExistsByFileName). serial = cs_log_data['serialNumber'] if not file_name.startswith(serial): file_name = f"{serial}_{file_name}" cs_log_json['start_session']['metadata']['deviceFileName'] = file_name cs_log_json['start_session']['metadata']['startTimestamp'] = start_of_day cs_log_json['start_session']['metadata']['endTimestamp'] = end_of_day cs_log_json['end_session']['checksum'] = checksum cs_log_json['general']['file_size'] = file_size cs_log_json['general']['file_path'] = cs_log_data['path'] return cs_log_json def helpers_construct_device_log_json(device_log_data: dict): """ Constructs the payload for device log file uploading :param device_log_data: a dictionary with the info related to the current file :returns: a json payload to be used for uploading using sessions. """ device_log_json = helpers_read_device_log_template(LOG_UPLOAD_TEMPLATE_PATH) # Stream the checksum from disk rather than reading the whole file + # base64-encoding it to produce a byte array. See # helpers_construct_cs_log_json for the rationale. checksum = helpers_sha256_checksum(device_log_data['path']) # Get file size file_size = helpers_get_file_size(device_log_data['path']) # Extract metadata from the filename file_name = os.path.basename(device_log_data['path']) extracted_metadata = helpers_extract_device_log_metadata(file_name) # Timestamp for GeneratedAt (in miliseconds) local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) # Derive deterministic timestamps from filename date (required for DCS dedup composite key). # When date+time are both available, use the exact datetime for both start and end. # When only date is available, use 00:00:00 for start and 23:59:59 for end. # Fallback to upload time only if no date can be extracted at all. if extracted_metadata['local_date'] != 'unknown' and extracted_metadata['local_time'] != 'unknown': datetime_obj = datetime.strptime(f"{extracted_metadata['local_date']}{extracted_metadata['local_time']}", "%Y%m%d%H%M%S") start_timestamp = int(datetime_obj.replace(tzinfo=timezone.utc).timestamp() * 1000) end_timestamp = start_timestamp elif extracted_metadata['local_date'] != 'unknown': date_obj = datetime.strptime(extracted_metadata['local_date'], "%Y%m%d").replace(tzinfo=timezone.utc) start_timestamp = int(date_obj.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000) end_timestamp = int(date_obj.replace(hour=23, minute=59, second=59, microsecond=0).timestamp() * 1000) else: start_timestamp = local_timestamp end_timestamp = local_timestamp # Populate JSON object device_log_json['start_session']['reference'] = str(uuid.uuid4()) device_log_json['start_session']['generatedAt'] = local_timestamp device_log_json['start_session']['dataType'] = 'device-data' device_log_json['start_session']['serialNumber'] = device_log_data['serialNumber'] device_log_json['start_session']['macAddress'] = 'device-data' device_log_json['start_session']['organizationId'] = device_log_data['organizationId'] device_log_json['start_session']['metadata']['dataType'] = 'device-log-category' device_log_json['start_session']['metadata']['deviceLogType'] = extracted_metadata['device_log_type'] device_log_json['start_session']['metadata']['deviceSubType'] = extracted_metadata['device_sub_type'] # Ensure deviceFileName includes serial for DCS dedup uniqueness per device. # Real device logs already have serial in the name (e.g., DVT-BN_20260320.u.log.gz). serial = device_log_data['serialNumber'] if not file_name.startswith(serial): file_name = f"{serial}_{file_name}" device_log_json['start_session']['metadata']['deviceFileName'] = file_name device_log_json['start_session']['metadata']['startTimestamp'] = start_timestamp device_log_json['start_session']['metadata']['endTimestamp'] = end_timestamp device_log_json['end_session']['checksum'] = checksum device_log_json['general']['file_size'] = file_size device_log_json['general']['file_path'] = device_log_data['path'] return device_log_json def helpers_should_update_dcs_log_level(g_config: dict) -> bool: """ Returns True if the state of the log level should be communicated to the DCS else False. It is controlled by the update_dcs_flag in the config.json. * 1 --> True * 0 --> False """ update_dcs_flag = g_config[CONFIG_LOGS][CONFIG_LOGS_UPDATE_DCS_FLAG] if int(update_dcs_flag) == 1: return True return False def helpers_should_update_cs_log_level(response: dict, g_config: dict) -> bool: current_log_level:str = g_config[CONFIG_LOGS][CONFIG_LOGS_CURRENT_LOG_LEVEL].upper() requested_log_level = response.get('logLevel') if requested_log_level is None: return False requested_log_level = requested_log_level.upper() if requested_log_level != current_log_level: g_utils.logger.debug(f"DCS requested to change the log level from {current_log_level} to {requested_log_level}") return True return False def helpers_calculate_log_level_duration(response): """ Returns a tuple with duration_in_seconds, start_timestamp, end_timestamp if a valid log level duration was given. Otherwise None """ duration = int(response.get('logLevelDuration', 0) or 0) start_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000) end_timestamp = start_timestamp + duration if duration > 0: duration_in_seconds = int(duration/1000) return duration_in_seconds, start_timestamp, end_timestamp return None def helpers_log_retention(retention_pct): cs_log_pct = retention_pct / 100 num_files_deleted = 0 total_deleted_size = 0 # Total sd stats — use the log path's filesystem (same as channels on device, # but may be a separate tmpfs in the mock stack for realistic simulation) sd_total_bytes, sd_used_bytes, sd_free_bytes = shutil.disk_usage(UI2CS_FILE_LOG_PATH) if sd_total_bytes == 0: sd_total_bytes = 1 # Cloudsync log stats used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_LOG_PATH]).split() if len(used_bytes_proc_call) > 0: cs_used_bytes = int(used_bytes_proc_call[0].decode('utf-8')) total_deleted_size = cs_used_bytes # Retrieve file list f = [] for subdir, dir, files in os.walk(UI2CS_FILE_LOG_PATH): for file in files: f.append(os.path.join(UI2CS_FILE_LOG_PATH, file)) # Sorted with oldest modified first. f.sort(key=lambda x: os.path.getmtime(x)) if len(f) > 1: while (cs_used_bytes / sd_total_bytes) >= cs_log_pct and len(f) > 1: file_to_remove = f[0] os.remove(file_to_remove) num_files_deleted += 1 f.pop(0) used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_LOG_PATH]).split() if len(used_bytes_proc_call) > 0: cs_used_bytes = int(used_bytes_proc_call[0].decode('utf-8')) else: g_utils.logger.error("Error in disk usage call.") break total_deleted_size = total_deleted_size - cs_used_bytes else: total_deleted_size = 0 g_utils.logger.info("No files Removed.") else: g_utils.logger.error("Error in disk usage call.") return num_files_deleted, total_deleted_size