"""Implementation of helper methods""" import os import shutil import json import datetime import hashlib import socket from time import time, sleep from typing import Union, Any from logging import Logger from cloudsync.utils.singleton import SingletonMeta from cloudsync.common.enums import * from cloudsync.handlers.network_request import NetworkRequest from cloudsync.utils.reachability import * from cloudsync.utils.globals import * class GUtils(metaclass=SingletonMeta): def __init__(self): self.logger = None self.reachability_provider = None def add_logger(self, logger: Logger): self.logger = logger def add_reachability_provider(self, reachability_provider: ReachabilityProvider): self.reachability_provider = reachability_provider g_utils = GUtils() CRC_LIST = [ 0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46, 67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109, 134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168, 197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235, 61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19, 126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80, 187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149, 248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214, 122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84, 57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23, 252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210, 191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145, 71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105, 4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42, 193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239, 130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172 ] def helpers_is_int(val: str) -> bool: """ Determines if the value can be converted to an int :param val: the value in string form :return: True if can be converted to an int, false otherwise """ try: int(val) return True except ValueError: return False def helpers_is_float(val: str) -> bool: """ Determines if the value can be converted to a float :param val: the value in string form :return: True if can be converted to a float, false otherwise """ try: float(val) except ValueError: return False return True def helpers_try_numeric(val: str) -> Union[int, float, str]: """ Tries to convert the value to numeric. If it's not possible leaves it as a string :param val: the value to convert :return: the converted value if possible, otherwise it is returned as is """ if helpers_is_int(val): return int(val) elif helpers_is_float(val): return float(val) else: return val def helpers_parse_treatment_log(path: str) -> dict: """ Converts a treatment.log file to a python dictionary The treatment log needs to be formatted better. Until then, this is a (non-ideal) way to read it. :param path: the path to the treatment log file :return: the parsed treatment log file """ if not os.path.exists(path): print("Path does not exist.") return {} result = {} with open(path, 'r') as f: lines = f.readlines() group = "NoGroup" for line_ in lines: line = line_.replace("\n", "") if "[" and "]" in line: group = line.split("[")[1].split("]")[0] if group not in result: result[group] = {} else: if group in ["Title", "Treatment Parameters", "Post-Treatment Data", "Extra"]: tokens = line.split(",") if len(tokens) > 1: subgroup = tokens[0] result[group][subgroup] = {} result[group][subgroup]["value"] = helpers_try_numeric(tokens[1]) if len(tokens) > 2: if tokens[2] == "": result[group][subgroup]["units"] = None else: result[group][subgroup]["units"] = tokens[2] elif group in ["Treatment Data", "Treatment Alarms", "Treatment Events"]: tokens = line.split(",") tokens_converted = [] for token in tokens: tokens_converted.append(helpers_try_numeric(token)) result[group]["data"] = result[group].get("data", []) + [tokens_converted] return result def helpers_device_state_to_cloud_state(hd_mode: HDOpModes, hd_sub_mode: HDOpSubModes) -> DeviceStates: """ Inactive Not OK – N/A – HD f/w will not know active vs. inactive – UI or cloud will have to maintain assigned tenant, active/inactive and Ok/Not OK while inactive Inactive OK – N/A Active OK – N/A Active Ready – mode is 3 (standby) and sub-mode is 1 (wait for treatment) Active In Treatment – mode between 4 and 7 (treatment params .. post treatment) Active Not Ready – mode/sub-mode is anything other than ready, in-treatment, or not OK Active Not OK – mode is 0 (fault) Decommissioned – N/A – HD f/w will not know if system is decommissioned """ if hd_mode == HDOpModes.MODE_STAN: return DeviceStates.ACTIVE_READY if (hd_mode == HDOpModes.MODE_TPAR) or (hd_mode == HDOpModes.MODE_PRET) or (hd_mode == HDOpModes.MODE_TREA) or ( hd_mode == HDOpModes.MODE_POST): return DeviceStates.ACTIVE_IN_TREATMENT if hd_mode == HDOpModes.MODE_FAUL: return DeviceStates.ACTIVE_NOT_OK return DeviceStates.ACTIVE_NOT_READY def helpers_get_stored_token() -> Union[str, None]: """ Returns the stored token :return: The token if found, otherwise returns None """ data = None # print('token path: {0}'.format(DEVICE_KEBORMED_ACCESS_TOKEN_PATH)) if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): return None with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: try: data = json.load(f) except json.decoder.JSONDecodeError: return None if data is None: return None return data.get("access_token", None) def helpers_read_config(path: str) -> dict: """ Read the configuration :param path: the path to the configuration :return: the loaded configuration """ if os.path.exists(path): with open(path, 'r') as f: config = json.load(f) return config else: g_utils.logger.error("Operation configuration file does not exist: {0}".format(path)) raise FileNotFoundError(f"Operation configuration file does not exist: {path}") def helpers_write_config(folder_path: str, file_path: str, config: dict) -> None: """ Writes the config to the provided path If folder_path is provided, it first checks if the folder exists and creates it if it doesn't :param folder_path: the path for the config folder :param file_path: the path where the config json will be written :param config: the config dictionary :return: None """ if folder_path is not None: if not os.path.exists(folder_path): os.makedirs(folder_path) with open(file_path, 'w') as f: json.dump(config, f, indent=4) def helpers_read_access_token(path: str) -> dict: """ Reads the access token json file, returns it as a dict :param path: The path to the access token json file :return: """ data = {} if os.path.exists(path): with open(path, 'r', encoding="utf-8-sig") as f: data = json.load(f) return data def helpers_read_treatment_report_template(path: str) -> dict: """ Read the treatment report template :param path: the path to the template :return: the loaded template """ if os.path.exists(path): with open(path, 'r') as f: template = json.load(f) return template else: g_utils.logger.error("Configuration file does not exist: {0}".format(path)) return {} def log_func(func): """ Log the function and the parameters passed to it @param func: The decorated function @return: The wrapper function """ def _wrapper(*args, **kwargs): g_utils.logger.debug("Calling {0} args: {1} kwargs: {2}".format(func.__name__, tuple(args), kwargs)) return func(*args, **kwargs) return _wrapper def helpers_read_treatment_log_file(path: str): treatment_data = helpers_read_treatment_report_template(TREATMENT_REPORT_TEMPLATE_PATH) try: f = open(path) counter = 0 treatment_log_lines = f.readlines() # print("log_lines: {0}".format(treatment_log_lines)) while counter < len(treatment_log_lines): line = treatment_log_lines[counter].strip() if line.startswith(SECTION_START_CHARACTER) and line.endswith(SECTION_STOP_CHARACTER) and counter < ( len(treatment_log_lines) - 2): section = line section_lines = [] counter += 1 section_start_counter = counter line = treatment_log_lines[counter].strip() while not (line.startswith(SECTION_START_CHARACTER) and line.endswith( SECTION_STOP_CHARACTER)) and counter < len(treatment_log_lines) - 1: section_lines.append(line) counter += 1 line = treatment_log_lines[counter].strip() if len(section_lines) > 0: if section == TREATMENT_DATA: for data_line in section_lines: data_components = data_line.split(',') data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "bloodFlowRate": float(data_components[1]), "dialysateFlowRate": float(data_components[2]), "ultrafiltrationRate": float(data_components[3]), "arterialPressure": float(data_components[4]), "venousPressure": float(data_components[5]), "systolic": float(data_components[6]), "diastolic": float(data_components[7]), "heartRate": float(data_components[8]) } treatment_data['data']['treatment']['data'].append(data_record) elif section == TREATMENT_ALARMS: for data_line in section_lines: data_components = data_line.split(',') parameters = [] if len(data_components) > 2: parameters = data_components[2:(len(data_components) - 2)] data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "title": data_components[1], "parameters": parameters } treatment_data['data']['treatment']['alarms'].append(data_record) elif section == TREATMENT_EVENTS: for data_line in section_lines: data_components = data_line.split(',') parameters = [] if len(data_components) > 2: parameters = data_components[2:(len(data_components) - 2)] data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "title": data_components[1], "parameters": parameters } treatment_data['data']['treatment']['events'].append(data_record) else: counter = section_start_counter else: # print('regular line') if line.startswith(TREATMENT_CODE): treatment_data['data']['treatment']['treatmentCode'] = line.split(',')[1] elif line.startswith(PATIENT_ID): treatment_data['data']['patient']['id'] = line.split(',')[1] elif line.startswith(TREATMENT_DURATION): treatment_data['data']['treatment']['parameters']['treatmentDuration'] = int(line.split(',')[1]) elif line.startswith(BLOOD_FLOW_RATE): treatment_data['data']['treatment']['parameters']['bloodFlowRate'] = int(line.split(',')[1]) elif line.startswith(DIALYSATE_FLOW_RATE): treatment_data['data']['treatment']['parameters']['dialysateFlowRate'] = int(line.split(',')[1]) elif line.startswith(ACID_CONCENTRATE_TYPE): treatment_data['data']['treatment']['parameters']['dialysate']['acidCode'] = line.split(',')[1] elif line.startswith(BICARBONATE_CONCENTRATE_TYPE): treatment_data['data']['treatment']['parameters']['dialysate']['bicarbCode'] = line.split(',')[1] elif line.startswith(POTASSIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['K'] = float(line.split(',')[1]) elif line.startswith(CALCIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['Ca'] = float(line.split(',')[1]) elif line.startswith(BICARBONATE_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['HCO3'] = float(line.split(',')[1]) elif line.startswith(SODIUM_CONCENTRATION): treatment_data['data']['treatment']['parameters']['dialysate']['Na'] = float(line.split(',')[1]) elif line.startswith(DIALYSATE_TEMPERATURE): treatment_data['data']['treatment']['parameters']['dialysateTemp'] = float(line.split(',')[1]) elif line.startswith(DIALYZER_TYPE): treatment_data['data']['treatment']['parameters']['dialyzerModel'] = line.split(',')[1] elif line.startswith(HEPARIN_TYPE): treatment_data['data']['treatment']['parameters']['heparinType'] = line.split(',')[1] elif line.startswith(HEPARIN_CONCENTRATION): treatment_data['data']['treatment']['parameters']['heparinConcentration'] = line.split(',')[1] elif line.startswith(HEPARIN_BOLUS_VOLUME): treatment_data['data']['treatment']['parameters']['heparinBolus'] = line.split(',')[1] elif line.startswith(HEPARIN_DISPENSE_RATE): treatment_data['data']['treatment']['parameters']['heparinRate'] = line.split(',')[1] elif line.startswith(HEPARIN_STOP): treatment_data['data']['treatment']['parameters']['heparinStopBeforeTreatmentEnd'] = line.split(',')[1] elif line.startswith(TREATMENT_START_DATE_TIME): element = datetime.datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") timestamp = datetime.datetime.timestamp(element) treatment_data['data']['treatment']['time']['start'] = int(timestamp) * S_MS_CONVERSION_FACTOR elif line.startswith(TREATMENT_END_DATE_TIME): element = datetime.datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") timestamp = datetime.datetime.timestamp(element) treatment_data['data']['treatment']['time']['end'] = int(timestamp) * S_MS_CONVERSION_FACTOR elif line.startswith(ACTUAL_TREATMENT_DURATION): treatment_data['data']['treatment']['time']['treatmentDuration'] = int(line.split(',')[1]) elif line.startswith(DIALYSATE_VOLUME_USED): treatment_data['data']['deviceTreatmentData']['dialysateVolumeUsed'] = float(line.split(',')[1]) elif line.startswith(PRESCRIBED_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(TARGET_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(ACTUAL_UF_VOLUME): treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationVolume'] = float( line.split(',')[1]) elif line.startswith(PRESCRIBED_UF_RATE): treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(TARGET_UF_RATE): treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(ACTUAL_UF_RATE): treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationRate'] = float( line.split(',')[1]) elif line.startswith(SALINE_BOLUS_VOLUME): treatment_data['data']['deviceTreatmentData']['salineBolusVolumeGiven'] = float(line.split(',')[1]) elif line.startswith(HEPARIN_DELIVERED_VOLUME): treatment_data['data']['deviceTreatmentData']['heparinDelivered'] = line.split(',')[1] elif line.startswith(WATER_SAMPLE_TEST_RESULT): treatment_data['data']['diagnostics']['waterSampleTestResult'] = line.split(',')[1] counter += 1 return treatment_data except IOError as er: g_utils.logger.error('Opening treatment log file error: {0}'.format(' '.join(er.args))) return None except Exception as e: g_utils.logger.error('Error parsing treatment file: {0}'.format(' '.join(e.args))) return None def helpers_add_to_network_queue(network_request_handler, request_type, url, payload, method, g_config, success_message): cycle_duration = REACHABILITY_CYCLE_PAUSE total_cycles = REACHABILITY_CYCLES cycle = 0 while (not g_utils.reachability_provider.reachability) and (cycle < total_cycles): sleep(cycle_duration) cycle += 1 if g_utils.reachability_provider.reachability: r = NetworkRequest(request_type=request_type, url=url, payload=payload, method=method, g_config=g_config) request_added_to_queue = False while not request_added_to_queue: request_added_to_queue = network_request_handler.enqueue_request(r) g_utils.logger.info(success_message) else: g_utils.logger.warning("Internet DOWN: Network request {0} couldn't be processed".format(request_type.name)) def helpers_add_to_output_channel(output_channel, message_body, success_message): message_added_to_queue = False while not message_added_to_queue: message_added_to_queue = output_channel.enqueue_message(message_body) g_utils.logger.info(success_message) def helpers_sha256_checksum(data: str) -> str: """ Returns the calculated checksum (SHA256) for input data @param data: input data @return:: checksum """ checksum = hashlib.sha256(data.encode()).hexdigest() return checksum def helpers_crc8(message_list): """ Returns the calculated crc from a message list @param message_list: is a list of integer numbers containing the message @return:: integer containing an unsigned byte """ crc = 0 for byte in message_list: unsigned_byte = byte ^ crc crc = CRC_LIST[unsigned_byte] return crc def helpers_get_ip_address(): hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) return ip_address def helpers_decommission_device(): parent_folder = DECOMMISSION_CS_PATH subfolders_to_delete = DECOMMISSION_FOLDERS for subfolder in subfolders_to_delete: path_to_delete = os.path.join(parent_folder, subfolder) if os.path.exists(path_to_delete) and os.path.isdir(path_to_delete): shutil.rmtree(path_to_delete) else: raise FileNotFoundError(f"{path_to_delete} not found or not a directory!")