Index: cloudsync/handlers/ui_cs_request_handler.py =================================================================== diff -u -r21530c20b92d62582924e30f7ece9f9bc56dc3ae -r30d856d15320fd60e59230bf44d277184f6905ce --- cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 21530c20b92d62582924e30f7ece9f9bc56dc3ae) +++ cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 30d856d15320fd60e59230bf44d277184f6905ce) @@ -57,7 +57,7 @@ else: return False - def handle_message(self, message: UICSMessage): + def handle_message(self, message: UICSMessage) -> None: self.logger.info('UI2CS Message: {0}'.format(message)) message_data = message.timestamp + message.sequence + message.ID + message.size @@ -261,3 +261,47 @@ error_body += ",{0}".format(parameter) error = Error(error_body=error_body) self.error_handler.enqueue_error(error=error) + + # UPLOAD DEVICE LOG TO CLOUD REQUEST + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_UPLOAD_DEVICE_LOG and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): + self.logger.info("UI2CS_UPLOAD_DEVICE_LOG request received") + + if (len(message.parameters) != 2) or (message.parameters[0] is None) or (message.parameters[1] is None): + error = Error( + "{0},2,{1},invalid # of parameters for file upload request".format( + OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value)) + self.error_handler.enqueue_error(error=error) + else: + try: + + local_checksum = helpers_sha256_checksum(message.parameters[0]) + assert (local_checksum == message.parameters[1]), 'No valid sha256 value.' + + hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] + sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] + + self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) + + device_log_json = helpers_construct_device_log_json(message.parameters[0]) + + if device_log_json: + device_log_json['serialNumber'] = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + + g_utils.logger.debug("Device log {0}".format(device_log_json)) + + helpers_add_to_network_queue(network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, + url='', + payload=device_log_json, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to network queue') + + except Exception as e: + error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + e)) + self.error_handler.enqueue_error(error=error) Index: cloudsync/utils/helpers.py =================================================================== diff -u -r21530c20b92d62582924e30f7ece9f9bc56dc3ae -r30d856d15320fd60e59230bf44d277184f6905ce --- cloudsync/utils/helpers.py (.../helpers.py) (revision 21530c20b92d62582924e30f7ece9f9bc56dc3ae) +++ cloudsync/utils/helpers.py (.../helpers.py) (revision 30d856d15320fd60e59230bf44d277184f6905ce) @@ -3,11 +3,13 @@ import os import shutil import json -import datetime import hashlib import socket +import re +import base64 import subprocess +from datetime import * from time import time, sleep from typing import Union, Any from logging import Logger @@ -251,6 +253,54 @@ return {} +def helpers_read_device_log_template(path: str) -> dict: + """ + Read the device log template + :param path: the path to the template + :return: the loaded template + """ + if os.path.exists(path): + with open(path, 'r') as f: + template = json.load(f) + return template + else: + g_utils.logger.error( + "Configuration file does not exist: {0}".format(path)) + return {} + + +def helpers_read_cs_log_template(path: str) -> dict: + """ + Read the cs log template + :param path: the path to the template + :return: the loaded template + """ + if os.path.exists(path): + with open(path, 'r') as f: + template = json.load(f) + return template + else: + g_utils.logger.error( + "Configuration file does not exist: {0}".format(path)) + return {} + + +def helpers_read_log_state(path: str) -> dict: + """ + Read the cs log state file + :param path: the path to the file + :return: the contents of the log state file + """ + if os.path.exists(path): + with open(path, 'r') as f: + template = json.load(f) + return template + else: + g_utils.logger.error( + "Configuration file does not exist: {0}".format(path)) + return {} + + def log_func(func): """ Log the function and the parameters passed to it @@ -267,6 +317,124 @@ return _wrapper +def helpers_extract_device_log_metadata(log_file_name:str) -> Union[dict,None]: + local_date_pattern = r"^\d{8}(?=_)" + local_time_pattern = r"^(?:^.*?)_(\d{6})_" + serial_number_pattern = r"^[a-zA-Z0-9]+_[a-zA-Z0-9]+_([a-zA-Z0-9]+)(?=_)" + device_subtype_pattern = r"^[a-zA-Z0-9]+_[a-zA-Z0-9]+_[a-zA-Z0-9]+_([a-zA-Z0-9]+)(?=)" + log_type_pattern = r"[a-zA-Z0-9]+\.u\.(\w+)(?=)" + + local_date_match = re.search(local_date_pattern, log_file_name) + local_time_match = re.search(local_time_pattern, log_file_name) + serial_number_match = re.search(serial_number_pattern, log_file_name) + device_subtype_match = re.search(device_subtype_pattern, log_file_name) + log_type_match = re.search(log_type_pattern, log_file_name) + + return { + "local_date": local_date_match.group(0) if local_date_match else 'unknown', + "local_time": local_time_match.group(1) if local_time_match else 'unknown', + "serial_number": serial_number_match.group(1) if serial_number_match else 'unknown', + "device_sub_type": device_subtype_match.group(1) if device_subtype_match else 'unknown', + "device_log_type": log_type_match.group(1) if log_type_match else 'unknown' + } + + +def helpers_file_to_byte_array(file_path: str) -> bytearray: + try: + with open(file_path, "rb") as f: + # Read the entire file in binary mode + file_bytes = f.read() + return base64.b64encode(file_bytes).decode('utf-8') + except FileNotFoundError as e: + g_utils.logger.error(f"Device log file not found: {e}") + return None + except Exception as e: + g_utils.logger.error(f"Error reading device log file: {e}") + return None + + +def helpers_construct_cs_log_json(path:str): + """ + Constructs the payload for cs log file uploading + :param path: the path to the log file + :returns: the json payload to be uploaded + """ + cs_log_data_template = helpers_read_cs_log_template(CS_LOG_TEMPLATE_PATH) + + # Convert the file into byte array + logs_byte_array = helpers_file_to_byte_array(path) + + # Get the filename + file_name = os.path.basename(path) + + # Completion and generation timestamp from CS (in miliseconds) + local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) + + # Start and End timestamps calculation + today = datetime.now(timezone.utc) + start_of_day = int(today.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()*1000) + end_of_day = int(today.replace(hour=23, minute=59, second=59, microsecond=999999).timestamp()*1000) + + # Populate JSON object + cs_log_data_template['metadata']['dataType'] = 'cloud-sync-log-category' + cs_log_data_template['metadata']['deviceFileName'] = file_name + cs_log_data_template['metadata']['startTimestamp'] = start_of_day + cs_log_data_template['metadata']['endTimestamp'] = end_of_day + cs_log_data_template['completedAt'] = local_timestamp + cs_log_data_template['generatedAt'] = local_timestamp + cs_log_data_template['data'] = logs_byte_array + cs_log_data_template['checksum'] = helpers_sha256_checksum(logs_byte_array) + + return cs_log_data_template + + +def helpers_construct_device_log_json(path:str): + """ + Constructs the payload for device log file uploading + :param path: the path to the log file + :returns: the json payload to be uploaded + """ + device_log_data = helpers_read_device_log_template(DEVICE_LOG_TEMPLATE_PATH) + + # Convert the file into byte array + logs_byte_array = helpers_file_to_byte_array(path) + + # Extract metadata from the filename + file_name = os.path.basename(path) + extracted_metadata = helpers_extract_device_log_metadata(file_name) + + # Completion and generation timestamp from CS (in miliseconds) + local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) + + # Calculate the UTC timestamp based on the local date and time the UI provided (in milliseconds) (NEED BETTER SOLUTION HERE) + if extracted_metadata['local_date'] != 'unknown' or extracted_metadata['local_time'] != 'unknown': + datetime_obj = datetime.strptime(f"{extracted_metadata['local_date']}{extracted_metadata['local_time']}", "%Y%m%d%H%M%S") + # Convert to UTC timestamp + ui_utc_timestamp = int(datetime_obj.timestamp()*1000) + else: + ui_utc_timestamp = local_timestamp + + + # Populate JSON object + if extracted_metadata is not None: + device_log_data['metadata']['dataType'] = 'device-log-category' + device_log_data['metadata']['deviceLogType'] = extracted_metadata['device_log_type'] + device_log_data['metadata']['deviceSubType'] = extracted_metadata['device_sub_type'] + device_log_data['metadata']['deviceFileName'] = file_name + device_log_data['metadata']['startTimestamp'] = ui_utc_timestamp + device_log_data['metadata']['endTimestamp'] = ui_utc_timestamp + device_log_data['completedAt'] = local_timestamp + device_log_data['generatedAt'] = local_timestamp + device_log_data['data'] = logs_byte_array + device_log_data['checksum'] = helpers_sha256_checksum(logs_byte_array) + + return device_log_data + + else: + g_utils.logger.error('Device log file name does not match the pattern') + return None + + def helpers_read_treatment_log_file(path: str): treatment_data = helpers_read_treatment_report_template(TREATMENT_REPORT_TEMPLATE_PATH) @@ -314,7 +482,7 @@ data_components = data_line.split(',') parameters = [] if len(data_components) > 2: - parameters = data_components[2:(len(data_components) - 2)] + parameters = data_components[2:len(data_components)] data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "title": data_components[1], @@ -326,7 +494,7 @@ data_components = data_line.split(',') parameters = [] if len(data_components) > 2: - parameters = data_components[2:(len(data_components) - 2)] + parameters = data_components[2:len(data_components)] data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, "title": data_components[1], @@ -375,12 +543,12 @@ elif line.startswith(HEPARIN_STOP): treatment_data['data']['treatment']['parameters']['heparinStopBeforeTreatmentEnd'] = line.split(',')[1] elif line.startswith(TREATMENT_START_DATE_TIME): - element = datetime.datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") - timestamp = datetime.datetime.timestamp(element) + element = datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") + timestamp = datetime.timestamp(element) treatment_data['data']['treatment']['time']['start'] = int(timestamp) * S_MS_CONVERSION_FACTOR elif line.startswith(TREATMENT_END_DATE_TIME): - element = datetime.datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") - timestamp = datetime.datetime.timestamp(element) + element = datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") + timestamp = datetime.timestamp(element) treatment_data['data']['treatment']['time']['end'] = int(timestamp) * S_MS_CONVERSION_FACTOR elif line.startswith(ACTUAL_TREATMENT_DURATION): treatment_data['data']['treatment']['time']['treatmentDuration'] = int(line.split(',')[1]) @@ -409,7 +577,24 @@ elif line.startswith(HEPARIN_DELIVERED_VOLUME): treatment_data['data']['deviceTreatmentData']['heparinDelivered'] = line.split(',')[1] elif line.startswith(WATER_SAMPLE_TEST_RESULT): - treatment_data['data']['diagnostics']['waterSampleTestResult'] = line.split(',')[1] + treatment_data['data']['treatment']['treatmentPreparation']['totalChlorine'] = line.split(',')[1] + # new treatment fields + elif line.startswith(TARGET_WEIGHT): + treatment_data['data']['treatment']['time']['targetWeight'] = "" + elif line.startswith(TOTAL_CHLORINE): + treatment_data['data']['treatment']['treatmentPreparation']['totalChlorine'] = "" + elif line.startswith(PH): + treatment_data['data']['treatment']['treatmentPreparation']['ph'] = "" + elif line.startswith(CONDUCTIVITY): + treatment_data['data']['treatment']['treatmentPreparation']['conductivity'] = "" + elif line.startswith(MACHINE_WIPED_DOWN): + treatment_data['data']['diagnostics']['machineWipedDown'] = "" + elif line.startswith(FILTER_LIFE): + treatment_data['data']['diagnostics']['filterLife'] = "" + elif line.startswith(LAST_CHEMICAL_DISINFECTION): + treatment_data['data']['diagnostics']['lastChemicalDisinfection'] = "" + elif line.startswith(LAST_HEAT_DISINFECTION): + treatment_data['data']['diagnostics']['lastHeatDisinfection'] = "" counter += 1 return treatment_data @@ -458,14 +643,24 @@ def helpers_sha256_checksum(data: str) -> str: """ Returns the calculated checksum (SHA256) for input data - @param data: input data + @param data: input data. It can be either string or file @return:: checksum """ - checksum = hashlib.sha256(data.encode()).hexdigest() - return checksum + isFile = os.path.isfile(data) + if isFile: + checksum = hashlib.sha256() + with open(data, "rb") as f: + # Read and update hash in blocks of 4K + for byte_block in iter(lambda: f.read(4096), b""): + checksum.update(byte_block) + else: + checksum = hashlib.sha256(data.encode()) + return checksum.hexdigest() + + def helpers_crc8(message_list): """ Returns the calculated crc from a message list @@ -497,6 +692,49 @@ raise FileNotFoundError(f"{path_to_delete} not found or not a directory!") +def helpers_should_update_dcs_log_level() -> bool: + """ + Returns True if the state of the log level should be + communicated to the DCS else False. It is controlled by + the update_dcs_flag in the log_state.json. + * 1 --> True + * 0 --> False + """ + log_state = helpers_read_log_state(CS_LOG_STATE_FILE_PATH) + if int(log_state['update_dcs_flag']) == 1: + return True + return False + + +def helpers_should_update_cs_log_level(response:dict) -> bool: + current_log_state = helpers_read_log_state(CS_LOG_STATE_FILE_PATH) + current_log_level:str = current_log_state['current_log_level'] + requested_log_level:str = response['logLevel'] + if requested_log_level.upper() != current_log_level.upper(): + g_utils.logger.debug(f"DCS requested to change the log level from {current_log_level} to {requested_log_level}") + return True + return False + + +def helpers_write_log_state(file_path: str, config: dict) -> None: + with open(file_path, 'w') as f: + json.dump(config, f, indent=4) + +def helpers_calculate_log_level_duration(response): + """ + Returns a tuple with duration_in_seconds, start_timestamp, end_timestamp + if a valid log level duration was given. Otherwise None + """ + duration = int(response['logLevelDuration']) if response['logLevelDuration'] else 0 + start_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000) + end_timestamp = start_timestamp + duration + + if duration > 0: + duration_in_seconds = int(duration/1000) + return duration_in_seconds, start_timestamp, end_timestamp + return None + + def helpers_log_rotation(rotation_pct): cs_log_pct = rotation_pct / 100 num_files_deleted = 0