"""Implementation of helper methods""" import os import shutil import json import hashlib import socket import re import base64 import uuid import subprocess import math from datetime import * from time import time, sleep from typing import Union, Any from logging import Logger from cloudsync.utils.singleton import SingletonMeta from cloudsync.common.enums import * from cloudsync.handlers.network_request import NetworkRequest from cloudsync.utils.reachability import * from cloudsync.utils.globals import * class GUtils(metaclass=SingletonMeta): def __init__(self): self.logger = None self.reachability_provider = None def add_logger(self, logger: Logger): self.logger = logger def add_reachability_provider(self, reachability_provider: ReachabilityProvider): self.reachability_provider = reachability_provider g_utils = GUtils() CRC_LIST = [ 0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46, 67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109, 134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168, 197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235, 61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19, 126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80, 187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149, 248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214, 122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84, 57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23, 252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210, 191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145, 71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105, 4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42, 193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239, 130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172 ] def helpers_is_int(val: str) -> bool: """ Determines if the value can be converted to an int :param val: the value in string form :return: True if can be converted to an int, false otherwise """ try: int(val) return True except ValueError: return False def helpers_is_float(val: str) -> bool: """ Determines if the value can be converted to a float :param val: the value in string form :return: True if can be converted to a float, false otherwise """ try: float(val) except (ValueError, TypeError): return False return True def helpers_try_numeric(val: str) -> Union[int, float, str]: """ Tries to convert the value to numeric. If it's not possible leaves it as a string :param val: the value to convert :return: the converted value if possible, otherwise it is returned as is """ if helpers_is_int(val): return int(val) elif helpers_is_float(val): try: f = float(val) except Exception: g_utils.logger.warning(f"Unexpected conversion failure for: '{val}' — replacing with 0") return 0 if math.isinf(f) or math.isnan(f): g_utils.logger.warning(f"Non-finite float value encountered: '{val}' — replacing with 0") return 0 return f else: return val def helpers_parse_treatment_log(path: str) -> dict: """ Converts a treatment.log file to a python dictionary The treatment log needs to be formatted better. Until then, this is a (non-ideal) way to read it. :param path: the path to the treatment log file :return: the parsed treatment log file """ if not os.path.exists(path): g_utils.logger.warning("Path does not exist: %s", path) return {} result = {} with open(path, 'r') as f: lines = f.readlines() group = "NoGroup" for line_ in lines: line = line_.replace("\n", "") if "[" in line and "]" in line: group = line.split("[")[1].split("]")[0] if group not in result: result[group] = {} else: if group in ["Title", "Treatment Parameters", "Post-Treatment Data", "Extra"]: tokens = line.split(",") if len(tokens) > 1: subgroup = tokens[0] result[group][subgroup] = {} result[group][subgroup]["value"] = helpers_try_numeric(tokens[1]) if len(tokens) > 2: if tokens[2] == "": result[group][subgroup]["units"] = None else: result[group][subgroup]["units"] = tokens[2] elif group in ["Treatment Data", "Treatment Alarms", "Treatment Events"]: tokens = line.split(",") tokens_converted = [] for token in tokens: tokens_converted.append(helpers_try_numeric(token)) result[group]["data"] = result[group].get("data", []) + [tokens_converted] return result def helpers_device_state_to_cloud_state(hd_mode: HDOpModes, hd_sub_mode: HDOpSubModes) -> DeviceStates: """ Inactive Not OK - N/A - HD f/w will not know active vs. inactive - UI or cloud will have to maintain assigned tenant, active/inactive and Ok/Not OK while inactive Inactive OK - N/A Active OK - N/A Active Ready - mode is 3 (standby) and sub-mode is 1 (wait for treatment) Active In Treatment - mode between 4 and 7 (treatment params .. post treatment) Active Not Ready - mode/sub-mode is anything other than ready, in-treatment, or not OK Active Not OK - mode is 0 (fault) Decommissioned - N/A - HD f/w will not know if system is decommissioned """ if hd_mode == HDOpModes.MODE_STAN: return DeviceStates.ACTIVE_READY if (hd_mode == HDOpModes.MODE_TPAR) or (hd_mode == HDOpModes.MODE_PRET) or (hd_mode == HDOpModes.MODE_TREA) or ( hd_mode == HDOpModes.MODE_POST): return DeviceStates.ACTIVE_IN_TREATMENT if hd_mode == HDOpModes.MODE_FAUL: return DeviceStates.ACTIVE_NOT_OK return DeviceStates.ACTIVE_NOT_READY def helpers_get_stored_token() -> Union[str, None]: """ Returns the stored token :return: The token if found, otherwise returns None """ data = None if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): return None with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: try: data = json.load(f) except json.decoder.JSONDecodeError: return None if data is None: return None return data.get("access_token", None) def helpers_read_config(path: str) -> dict: """ Read the configuration :param path: the path to the configuration :return: the loaded configuration """ if os.path.exists(path): with open(path, 'r') as f: config = json.load(f) return config else: g_utils.logger.error("Operation configuration file does not exist: {0}".format(path)) raise FileNotFoundError(f"Operation configuration file does not exist: {path}") def helpers_write_config(folder_path: str, file_path: str, config: dict) -> None: """ Writes the config to the provided path If folder_path is provided, it first checks if the folder exists and creates it if it doesn't :param folder_path: the path for the config folder :param file_path: the path where the config json will be written :param config: the config dictionary :return: None """ if folder_path is not None: if not os.path.exists(folder_path): os.makedirs(folder_path) with open(file_path, 'w') as f: json.dump(config, f, indent=4) def helpers_read_access_token(path: str) -> dict: """ Reads the access token json file, returns it as a dict :param path: The path to the access token json file :return: """ data = {} if os.path.exists(path): with open(path, 'r', encoding="utf-8-sig") as f: data = json.load(f) return data def helpers_read_treatment_report_template(path: str) -> dict: """ Read the treatment report template :param path: the path to the template :return: the loaded template """ if os.path.exists(path): with open(path, 'r') as f: template = json.load(f) return template else: g_utils.logger.error( "Configuration file does not exist: {0}".format(path)) return {} def log_func(func): """ Log the function and the parameters passed to it @param func: The decorated function @return: The wrapper function """ def _wrapper(*args, **kwargs): g_utils.logger.debug("Calling {0} args: {1} kwargs: {2}".format(func.__name__, tuple(args), kwargs)) return func(*args, **kwargs) return _wrapper def helpers_file_to_byte_array(file_path: str) -> Union[str, None]: try: with open(file_path, "rb") as f: # Read the entire file in binary mode file_bytes = f.read() return base64.b64encode(file_bytes).decode('utf-8') except FileNotFoundError as e: g_utils.logger.error(f"Device log file not found: {e}") return None except Exception as e: g_utils.logger.error(f"Error reading device log file: {e}") return None def helpers_get_file_size(file_path: str) -> Union[int, None]: try: return os.path.getsize(file_path) except OSError as e: g_utils.logger.error(f'Error getting file size: {e}') return None def helpers_read_treatment_log_file(path: str): treatment_data = helpers_read_treatment_report_template(TREATMENT_REPORT_TEMPLATE_PATH) try: f = open(path) counter = 0 treatment_log_lines = f.readlines() while counter < len(treatment_log_lines): line = treatment_log_lines[counter].strip() if line.startswith(SECTION_START_CHARACTER) and line.endswith(SECTION_STOP_CHARACTER) and counter < ( len(treatment_log_lines) - 2): section = line section_lines = [] counter += 1 section_start_counter = counter line = treatment_log_lines[counter].strip() while not (line.startswith(SECTION_START_CHARACTER) and line.endswith( SECTION_STOP_CHARACTER)) and counter < len(treatment_log_lines) - 1: section_lines.append(line) counter += 1 line = treatment_log_lines[counter].strip() if len(section_lines) > 0: if section == TREATMENT_DATA: for data_line in section_lines: data_components = data_line.split(',') data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "bloodFlowRate": helpers_try_numeric(data_components[1]), "dialysateFlowRate": helpers_try_numeric(data_components[2]), "ultrafiltrationRate": helpers_try_numeric(data_components[3]), "arterialPressure": helpers_try_numeric(data_components[4]), "venousPressure": helpers_try_numeric(data_components[5]), "systolic": helpers_try_numeric(data_components[6]), "diastolic": helpers_try_numeric(data_components[7]), "heartRate": helpers_try_numeric(data_components[8]) } treatment_data['data']['treatment']['data'].append(data_record) elif section == TREATMENT_ALARMS: for data_line in section_lines: data_components = data_line.split(',') parameters = [] if len(data_components) > 2: parameters = data_components[2:len(data_components)] data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "title": data_components[1], "parameters": parameters } treatment_data['data']['treatment']['alarms'].append(data_record) elif section == TREATMENT_EVENTS: for data_line in section_lines: data_components = data_line.split(',') parameters = [] if len(data_components) > 2: parameters = data_components[2:len(data_components)] data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "title": data_components[1], "parameters": parameters } treatment_data['data']['treatment']['events'].append(data_record) else: counter = section_start_counter else: if line.startswith(TREATMENT_CODE): treatment_data['data']['treatment']['treatmentCode'] = line.split(',')[1] treatment_data['reference'] = line.split(',')[1] elif line.startswith(PATIENT_ID): treatment_data['data']['patient']['id'] = line.split(',')[1] elif line.startswith(TREATMENT_DURATION): treatment_data['data']['treatment']['parameters']['treatmentDuration'] = int(line.split(',')[1]) elif line.startswith(BLOOD_FLOW_RATE): treatment_data['data']['treatment']['parameters']['bloodFlowRate'] = int(line.split(',')[1]) elif line.startswith(DIALYSATE_FLOW_RATE): treatment_data['data']['treatment']['parameters']['dialysateFlowRate'] = int(line.split(',')[1]) elif line.startswith(ACID_CONCENTRATE_TYPE): treatment_data['data']['treatment']['parameters']['dialysate']['acidCode'] = line.split(',')[1] elif line.startswith(BICARBONATE_CONCENTRATE_TYPE): treatment_data['data']['treatment']['parameters']['dialysate']['bicarbCode'] = line.split(',')[1] elif line.startswith(POTASSIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['K'] = float(line.split(',')[1]) elif line.startswith(CALCIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['Ca'] = float(line.split(',')[1]) elif line.startswith(BICARBONATE_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['HCO3'] = float(line.split(',')[1]) elif line.startswith(SODIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['Na'] = float(line.split(',')[1]) elif line.startswith(DIALYSATE_TEMPERATURE): treatment_data['data']['treatment']['parameters']['dialysateTemp'] = float(line.split(',')[1]) elif line.startswith(DIALYZER_TYPE): treatment_data['data']['treatment']['parameters']['dialyzerModel'] = line.split(',')[1] elif line.startswith(HEPARIN_TYPE): treatment_data['data']['treatment']['parameters']['heparinType'] = line.split(',')[1] elif line.startswith(HEPARIN_CONCENTRATION): treatment_data['data']['treatment']['parameters']['heparinConcentration'] = line.split(',')[1] elif line.startswith(HEPARIN_BOLUS_VOLUME): treatment_data['data']['treatment']['parameters']['heparinBolus'] = line.split(',')[1] elif line.startswith(HEPARIN_DISPENSE_RATE): treatment_data['data']['treatment']['parameters']['heparinRate'] = line.split(',')[1] elif line.startswith(HEPARIN_STOP): treatment_data['data']['treatment']['parameters']['heparinStopBeforeTreatmentEnd'] = line.split(',')[1] elif line.startswith(TREATMENT_START_DATE_TIME): element = datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") timestamp = datetime.timestamp(element) treatment_data['data']['treatment']['time']['start'] = int(timestamp) * S_MS_CONVERSION_FACTOR elif line.startswith(TREATMENT_END_DATE_TIME): element = datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") timestamp = datetime.timestamp(element) treatment_data['data']['treatment']['time']['end'] = int(timestamp) * S_MS_CONVERSION_FACTOR elif line.startswith(ACTUAL_TREATMENT_DURATION): treatment_data['data']['treatment']['time']['treatmentDuration'] = int(line.split(',')[1]) elif line.startswith(DIALYSATE_VOLUME_USED): treatment_data['data']['deviceTreatmentData']['dialysateVolumeUsed'] = float(line.split(',')[1]) elif line.startswith(PRESCRIBED_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(TARGET_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(ACTUAL_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(PRESCRIBED_UF_RATE): treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(TARGET_UF_RATE): treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(ACTUAL_UF_RATE): treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(SALINE_BOLUS_VOLUME): treatment_data['data']['deviceTreatmentData']['salineBolusVolumeGiven'] = float(line.split(',')[1]) elif line.startswith(HEPARIN_DELIVERED_VOLUME): treatment_data['data']['deviceTreatmentData']['heparinDelivered'] = line.split(',')[1] elif line.startswith(WATER_SAMPLE_TEST_RESULT): treatment_data['data']['treatment']['treatmentPreparation']['totalChlorine'] = line.split(',')[1] # new treatment fields elif line.startswith(TARGET_WEIGHT): treatment_data['data']['treatment']['time']['targetWeight'] = "" elif line.startswith(TOTAL_CHLORINE): treatment_data['data']['treatment']['treatmentPreparation']['totalChlorine'] = "" elif line.startswith(PH): treatment_data['data']['treatment']['treatmentPreparation']['ph'] = "" elif line.startswith(CONDUCTIVITY): treatment_data['data']['treatment']['treatmentPreparation']['conductivity'] = "" elif line.startswith(MACHINE_WIPED_DOWN): treatment_data['data']['diagnostics']['machineWipedDown'] = "" elif line.startswith(FILTER_LIFE): treatment_data['data']['diagnostics']['filterLife'] = "" elif line.startswith(LAST_CHEMICAL_DISINFECTION): treatment_data['data']['diagnostics']['lastChemicalDisinfection'] = "" elif line.startswith(LAST_HEAT_DISINFECTION): treatment_data['data']['diagnostics']['lastHeatDisinfection'] = "" counter += 1 return treatment_data except IOError as er: g_utils.logger.error('Opening treatment log file error: {0}'.format(' '.join(er.args))) return None except Exception as e: g_utils.logger.error('Error parsing treatment file: {0}'.format(' '.join(e.args))) return None def helpers_add_to_network_queue(network_request_handler, request_type, url, payload, method, g_config, success_message): cycle_duration = REACHABILITY_CYCLE_PAUSE total_cycles = REACHABILITY_CYCLES cycle = 0 while (not g_utils.reachability_provider.reachability) and (cycle < total_cycles): sleep(cycle_duration) cycle += 1 if g_utils.reachability_provider.reachability: r = NetworkRequest(request_type=request_type, url=url, payload=payload, method=method, g_config=g_config) request_added_to_queue = False max_attempts = 300 attempt = 0 while not request_added_to_queue and attempt < max_attempts: request_added_to_queue = network_request_handler.enqueue_request(r) if not request_added_to_queue: sleep(0.1) attempt += 1 if request_added_to_queue: g_utils.logger.info(success_message) else: g_utils.logger.error("Failed to enqueue network request after {0} attempts: {1}".format(max_attempts, request_type.name)) else: g_utils.logger.warning("Internet DOWN: Network request {0} couldn't be processed".format(request_type.name)) def helpers_add_to_output_channel(output_channel, message_body, success_message): message_added_to_queue = False max_attempts = 300 attempt = 0 while not message_added_to_queue and attempt < max_attempts: message_added_to_queue = output_channel.enqueue_message(message_body) if not message_added_to_queue: sleep(0.1) attempt += 1 if message_added_to_queue: g_utils.logger.info(success_message) else: g_utils.logger.error("Failed to enqueue output message after {0} attempts".format(max_attempts)) def helpers_sha256_checksum(data: str) -> str: """ Returns the calculated checksum (SHA256) for input data @param data: input data. It can be either string or file @return:: checksum """ isFile = os.path.isfile(data) if isFile: checksum = hashlib.sha256() with open(data, "rb") as f: # Read and update hash in blocks of 4K for byte_block in iter(lambda: f.read(4096), b""): checksum.update(byte_block) else: checksum = hashlib.sha256(data.encode()) return checksum.hexdigest() def helpers_crc8(message_list): """ Returns the calculated crc from a message list @param message_list: is a list of integer numbers containing the message @return:: integer containing an unsigned byte """ crc = 0 for byte in message_list: unsigned_byte = byte ^ crc crc = CRC_LIST[unsigned_byte] return crc def helpers_get_ip_address(): hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) return ip_address def helpers_decommission_device(): parent_folder = DECOMMISSION_CS_PATH subfolders_to_delete = DECOMMISSION_FOLDERS for subfolder in subfolders_to_delete: path_to_delete = os.path.join(parent_folder, subfolder) if os.path.exists(path_to_delete) and os.path.isdir(path_to_delete): shutil.rmtree(path_to_delete) else: raise FileNotFoundError( f"{path_to_delete} not found or not a directory!") # LOGGING SPECIFIC HELPERS def helpers_read_device_log_template(path: str) -> dict: """ Read the device log template :param path: the path to the template :return: the loaded template """ if os.path.exists(path): with open(path, 'r') as f: template = json.load(f) return template else: g_utils.logger.error( "Configuration file does not exist: {0}".format(path)) return {} def helpers_read_cs_log_template(path: str) -> dict: """ Read the cs log template :param path: the path to the template :return: the loaded template """ if os.path.exists(path): with open(path, 'r') as f: template = json.load(f) return template else: g_utils.logger.error( "Configuration file does not exist: {0}".format(path)) return {} def helpers_extract_device_log_metadata(log_file_name:str) -> Union[dict,None]: local_date_pattern = r"^\d{8}(?=_)" local_time_pattern = r"^(?:^.*?)_(\d{6})_" serial_number_pattern = r"^[a-zA-Z0-9]+_[a-zA-Z0-9]+_([a-zA-Z0-9]+)(?=_)" device_subtype_pattern = r"^[a-zA-Z0-9]+_[a-zA-Z0-9]+_[a-zA-Z0-9]+_([a-zA-Z0-9]+)(?=)" log_type_pattern = r"[a-zA-Z0-9]+\.u\.(\w+)(?=)" local_date_match = re.search(local_date_pattern, log_file_name) local_time_match = re.search(local_time_pattern, log_file_name) serial_number_match = re.search(serial_number_pattern, log_file_name) device_subtype_match = re.search(device_subtype_pattern, log_file_name) log_type_match = re.search(log_type_pattern, log_file_name) return { "local_date": local_date_match.group(0) if local_date_match else 'unknown', "local_time": local_time_match.group(1) if local_time_match else 'unknown', "serial_number": serial_number_match.group(1) if serial_number_match else 'unknown', "device_sub_type": device_subtype_match.group(1) if device_subtype_match else 'unknown', "device_log_type": log_type_match.group(1) if log_type_match else 'unknown' } def helpers_construct_cs_log_json(cs_log_data: dict): """ Constructs the payload for cs log file uploading :param path: the path to the log file :returns: the json payload to be uploaded """ cs_log_json = helpers_read_cs_log_template(LOG_UPLOAD_TEMPLATE_PATH) # Convert the file into byte array logs_byte_array = helpers_file_to_byte_array(cs_log_data['path']) checksum = helpers_sha256_checksum(logs_byte_array) # Get file size file_size = helpers_get_file_size(cs_log_data['path']) # Get the filename file_name = os.path.basename(cs_log_data['path']) # Completion and generation timestamp from CS (in miliseconds) local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) # Start and End timestamps calculation # Parse the date from the rotated log filename (suffix format: MM-DD-YYYY) # to ensure timestamps match the actual log date, not the upload date. # This prevents mismatched timestamps when uploading backlogged files. log_date = None parts = file_name.rsplit('.', 1) if len(parts) == 2: try: log_date = datetime.strptime(parts[1], "%m-%d-%Y").replace(tzinfo=timezone.utc) except ValueError: pass if log_date is None: log_date = datetime.now(timezone.utc) start_of_day = int(log_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()*1000) end_of_day = int(log_date.replace(hour=23, minute=59, second=59, microsecond=999999).timestamp()*1000) # Populate JSON object cs_log_json['start_session']['reference'] = str(uuid.uuid4()) cs_log_json['start_session']['generatedAt'] = local_timestamp cs_log_json['start_session']['dataType'] = 'device-data' cs_log_json['start_session']['serialNumber'] = cs_log_data['serialNumber'] cs_log_json['start_session']['macAddress'] = 'device-data' cs_log_json['start_session']['organizationId'] = cs_log_data['organizationId'] cs_log_json['start_session']['metadata']['dataType'] = 'cloud-sync-log-category' cs_log_json['start_session']['metadata']['deviceFileName'] = file_name cs_log_json['start_session']['metadata']['startTimestamp'] = start_of_day cs_log_json['start_session']['metadata']['endTimestamp'] = end_of_day cs_log_json['end_session']['checksum'] = checksum cs_log_json['general']['file_size'] = file_size cs_log_json['general']['file_path'] = cs_log_data['path'] return cs_log_json def helpers_construct_device_log_json(device_log_data: dict): """ Constructs the payload for device log file uploading :param device_log_data: a dictionary with the info related to the current file :returns: a json payload to be used for uploading using sessions. """ device_log_json = helpers_read_device_log_template(LOG_UPLOAD_TEMPLATE_PATH) # Convert the file into byte array logs_byte_array = helpers_file_to_byte_array(device_log_data['path']) checksum = helpers_sha256_checksum(logs_byte_array) # Get file size file_size = helpers_get_file_size(device_log_data['path']) # Extract metadata from the filename file_name = os.path.basename(device_log_data['path']) extracted_metadata = helpers_extract_device_log_metadata(file_name) # Timestamp for GeneratedAt (in miliseconds) local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) # Calculate the UTC timestamp based on the local date and time the UI provided (in milliseconds) (NEED BETTER SOLUTION HERE) if extracted_metadata['local_date'] != 'unknown' and extracted_metadata['local_time'] != 'unknown': datetime_obj = datetime.strptime(f"{extracted_metadata['local_date']}{extracted_metadata['local_time']}", "%Y%m%d%H%M%S") # Convert to UTC timestamp ui_utc_timestamp = int(datetime_obj.timestamp()*1000) else: ui_utc_timestamp = local_timestamp # Populate JSON object if extracted_metadata is not None: device_log_json['start_session']['reference'] = str(uuid.uuid4()) device_log_json['start_session']['generatedAt'] = local_timestamp device_log_json['start_session']['dataType'] = 'device-data' device_log_json['start_session']['serialNumber'] = device_log_data['serialNumber'] device_log_json['start_session']['macAddress'] = 'device-data' device_log_json['start_session']['organizationId'] = device_log_data['organizationId'] device_log_json['start_session']['metadata']['dataType'] = 'device-log-category' device_log_json['start_session']['metadata']['deviceLogType'] = extracted_metadata['device_log_type'] device_log_json['start_session']['metadata']['deviceSubType'] = extracted_metadata['device_sub_type'] device_log_json['start_session']['metadata']['deviceFileName'] = file_name device_log_json['start_session']['metadata']['startTimestamp'] = ui_utc_timestamp device_log_json['start_session']['metadata']['endTimestamp'] = ui_utc_timestamp device_log_json['end_session']['checksum'] = checksum device_log_json['general']['file_size'] = file_size device_log_json['general']['file_path'] = device_log_data['path'] return device_log_json else: g_utils.logger.error('Device log file name does not match the pattern') return None def helpers_should_update_dcs_log_level(g_config: dict) -> bool: """ Returns True if the state of the log level should be communicated to the DCS else False. It is controlled by the update_dcs_flag in the config.json. * 1 --> True * 0 --> False """ update_dcs_flag = g_config[CONFIG_LOGS][CONFIG_LOGS_UPDATE_DCS_FLAG] if int(update_dcs_flag) == 1: return True return False def helpers_should_update_cs_log_level(response: dict, g_config: dict) -> bool: current_log_level:str = g_config[CONFIG_LOGS][CONFIG_LOGS_CURRENT_LOG_LEVEL].upper() requested_log_level:str = response['logLevel'].upper() if requested_log_level != current_log_level: g_utils.logger.debug(f"DCS requested to change the log level from {current_log_level} to {requested_log_level}") return True return False def helpers_calculate_log_level_duration(response): """ Returns a tuple with duration_in_seconds, start_timestamp, end_timestamp if a valid log level duration was given. Otherwise None """ duration = int(response['logLevelDuration']) if response['logLevelDuration'] else 0 start_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000) end_timestamp = start_timestamp + duration if duration > 0: duration_in_seconds = int(duration/1000) return duration_in_seconds, start_timestamp, end_timestamp return None def helpers_log_retention(retention_pct): cs_log_pct = retention_pct / 100 num_files_deleted = 0 total_deleted_size = 0 # Total sd stats — use the log path's filesystem (same as channels on device, # but may be a separate tmpfs in the mock stack for realistic simulation) sd_total_bytes, sd_used_bytes, sd_free_bytes = shutil.disk_usage(UI2CS_FILE_LOG_PATH) if sd_total_bytes == 0: sd_total_bytes = 1 # Cloudsync log stats used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_LOG_PATH]).split() if len(used_bytes_proc_call) > 0: cs_used_bytes = int(used_bytes_proc_call[0].decode('utf-8')) total_deleted_size = cs_used_bytes # Retrieve file list f = [] for subdir, dir, files in os.walk(UI2CS_FILE_LOG_PATH): for file in files: f.append(os.path.join(UI2CS_FILE_LOG_PATH, file)) # Sorted with oldest modified first. f.sort(key=lambda x: os.path.getmtime(x)) if len(f) > 1: while (cs_used_bytes / sd_total_bytes) >= cs_log_pct and len(f) > 1: file_to_remove = f[0] os.remove(file_to_remove) num_files_deleted += 1 f.pop(0) used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_LOG_PATH]).split() if len(used_bytes_proc_call) > 0: cs_used_bytes = int(used_bytes_proc_call[0].decode('utf-8')) else: g_utils.logger.error("Error in disk usage call.") break total_deleted_size = total_deleted_size - cs_used_bytes else: total_deleted_size = 0 g_utils.logger.info("No files Removed.") else: g_utils.logger.error("Error in disk usage call.") return num_files_deleted, total_deleted_size