Index: busses/file_input_bus.py =================================================================== diff -u --- busses/file_input_bus.py (revision 0) +++ busses/file_input_bus.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,90 @@ +"""Implementation of an Input Bus based on the Linux file system and iNotify library""" + +from logging import Logger +from threading import Event, Thread +from time import time, sleep +import inotify.adapters + +from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler +from cloudsync.handlers.uics_message import UICSMessage +from cloudsync.utils.filesystem import check_readable + + +class FileInputBus: + """File Input Bus class that monitors, parses and sends upstream all inbound messages""" + def __init__(self, + logger: Logger, + file_channels_path: str, + input_channel_name: str, + g_config: dict, + message_handler: UICSMessageHandler): + """ + Initialize the File Input Bus. + + :param Logger logger: Logger object + :param str file_channels_path: the path where the bus files are located + :param str input_channel_name: the name of the input channel file + :param dict g_config: the system configuration object + :param UICSMessageHandler message_handler: the upstream message handler + """ + + self.last_input_message_id = 0 + + self.logger = logger + self.file_channels_path = file_channels_path + self.input_channel_name = input_channel_name + self.g_config = g_config + self.message_handler = message_handler + + self.i = inotify.adapters.Inotify() + self.i.add_watch(self.file_channels_path) + + self.thread = Thread(target=self.input_channel_handler, daemon=True) + self.event = Event() + self.thread.start() + + def input_channel_handler(self): + """ + The Input Channel Handler - it parses and sends upstream all messages arriving on the File Input Bus + """ + for event in self.i.event_gen(yield_nones=False): + (_, type_names, path, filename) = event + # self.logger.debug("PATH=[{}] FILENAME=[{}] EVENT_TYPES={}".format(path, filename, type_names)) + if (('IN_MODIFY' in type_names) or ('IN_CLOSE_WRITE' in type_names)) and ( + filename.endswith(self.input_channel_name)): + + input_file_path = self.file_channels_path + "/" + filename + if not check_readable(input_file_path): + self.logger.warning('Input file not readable: {0}'.format(input_file_path)) + continue + + try: + f = open(input_file_path) + except IOError as er: + self.logger.error('Opening input file error: {0}'.format(' '.join(str(er)))) + continue + + new_input_messages = [] + + for line in f.readlines(): + message_parameters = line.strip().split(',') + try: + sequence_id = int(message_parameters[1]) + except (ValueError, IndexError): + self.logger.warning('Skipping message with invalid sequence: {0}'.format(line.strip())) + continue + if sequence_id > self.last_input_message_id: + new_message = UICSMessage(line.strip(), self.g_config) + message_added_to_queue = self.message_handler.enqueue_message(new_message) + if message_added_to_queue: + new_input_messages.append((sequence_id, line.strip())) + + new_input_messages.sort(key=lambda x: x[0]) + # self.logger.debug("New Input messages added to queue: {0}".format(new_input_messages)) + + if len(new_input_messages) > 0: + last_new_message_id = new_input_messages[len(new_input_messages) - 1][0] + self.last_input_message_id = last_new_message_id + else: + last_new_message_id = self.last_input_message_id + Index: busses/file_output_bus.py =================================================================== diff -u --- busses/file_output_bus.py (revision 0) +++ busses/file_output_bus.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,87 @@ +"""Implementation of an Output Bus based on the Linux file system""" + +from logging import Logger +from collections import deque +from threading import Event, Thread +import datetime + +from cloudsync.utils import helpers +from cloudsync.utils.filesystem import check_writable, check_disk_space_mb + + +class FileOutputBus: + """File Output Bus class that receives, parses and sends downstream all outbound messages""" + def __init__(self, + logger: Logger, + max_size, + file_channels_path: str): + """ + Initialize the File Input Bus. + + :param Logger logger: Logger object + :param max_size: the maximum size of the message queue for the Output Bus + :type max_size: int + :param str file_channels_path: the path where the bus files are located + """ + + self.last_output_message_id = 1 + + self.logger = logger + self.file_channels_path = file_channels_path + self.queue = deque(maxlen=max_size) + self.thread = Thread(target=self.scheduler, daemon=True) + self.event = Event() + self.thread.start() + + def scheduler(self) -> None: + """ + Continuously monitors the event flag to check for new messages + :return: None + """ + while True: + flag = self.event.wait() + if flag: + while len(self.queue) > 0: + message_body = self.queue.popleft() + self.handle_message(message_body) + self.event.clear() + + def enqueue_message(self, message_body: str) -> bool: + """ + Adds messages to the queue + :param str message_body: the data to add to the queue + :return: True upon success, False otherwise + """ + if len(self.queue) < self.queue.maxlen: + self.queue.append(message_body) + self.event.set() + return True + else: + return False + + def handle_message(self, + message_body: str): + """ + Parses queue messages and send them downstream + :param str message_body: the message body + """ + self.logger.debug('Message body: {0}'.format(message_body)) + + try: + if not check_writable(self.file_channels_path): + raise IOError("Output channel path is not writable: {}".format(self.file_channels_path)) + if not check_disk_space_mb(self.file_channels_path, required_mb=1): + raise IOError("Insufficient disk space at: {}".format(self.file_channels_path)) + + now = datetime.datetime.now(datetime.timezone.utc) + filename = now.date().strftime("%Y_%m_%d_out.buf") + timestamp_str = str(round(now.timestamp())) + message_data = timestamp_str + str(self.last_output_message_id) + message_body + message_crc8 = helpers.helpers_crc8(message_data.encode('utf-8')) + message_body = timestamp_str + ',' + str(self.last_output_message_id) + ',' + str(message_crc8) + ',' + message_body + self.logger.info('CS2UI Message: {0}'.format(message_body)) + with open(self.file_channels_path + "/" + filename, "a") as f: + f.write("{0}\n".format(message_body)) + self.last_output_message_id += 1 + except IOError as er: + self.logger.error('Opening and/or writing to output file error: {0}'.format(' '.join(str(er)))) Index: common/enums.py =================================================================== diff -u --- common/enums.py (revision 0) +++ common/enums.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,148 @@ +"""Implementation of all enumerations used in the application""" + +from enum import Enum, unique + + +class RootEnum(Enum): + + @classmethod + def has_value(cls, value): + return value in cls._value2member_map_ + + @classmethod + def mapped_int_value(cls, value): + try: + return cls._value2member_map_[value] + except Exception: + return None + + @classmethod + def mapped_str_value(cls, value): + try: + return cls._value2member_map_[int(value)] + except Exception: + return None + + +@unique +class HDOpModes(RootEnum): + MODE_FAUL = 0 # Fault mode + MODE_SERV = 1 # Service mode + MODE_INIT = 2 # Initialization & POST mode + MODE_STAN = 3 # Standby mode + MODE_TPAR = 4 # Treatment Parameters mode + MODE_PRET = 5 # Pre-Treatment mode + MODE_TREA = 6 # Treatment mode + MODE_POST = 7 # Post-Treatment mode + NUM_OF_MODES = 8 # Number of HD operation modes + + +@unique +class HDOpSubModes(RootEnum): + SUBMODE_START = 0 + SUBMODE_WAIT_FOR_TREATMENT = 1 + SUBMODE_WAIT_FOR_DISINFECT = 2 + SUBMODE_DG_FLUSH_IN_PROGRESS = 3 + SUBMODE_DG_HEAT_DISINFECT_IN_PROGRESS = 4 + SUBMODE_DG_CHEMICAL_DISINFECT_IN_PROGRESS = 5 + NUM_OF_MODES = 6 + + +@unique +class DeviceStates(RootEnum): + INACTIVE_NOT_OK = 1 + INACTIVE_OK = 2 + ACTIVE_OK = 3 + ACTIVE_READY = 4 + ACTIVE_IN_TREATMENT = 5 + ACTIVE_NOT_READY = 6 + ACTIVE_NOT_OK = 7 + DECOMMISSIONED = 8 + UNKNOWN_STATE = 9 + + +@unique +class InboundMessageIDs(RootEnum): + # REGISTRATION + UI2CS_REQ_REGISTRATION = 1001 + UI2CS_SEND_DEVICE_INFO = 1002 + UI2CS_SEND_CREDENTIALS_SAVED = 1003 + UI2CS_SEND_CHECKIN = 1004 + UI2CS_SEND_FACTORY_RESET_CONFIRMATION = 1005 + + # OPERATION + UI2CS_SEND_DEVICE_STATE = 1006 + UI2CS_SEND_TREATMENT_REPORT = 1007 + UI2CS_REQ_DECOMMISSION = 1009 + UI2CS_UPLOAD_DEVICE_LOG = 1010 # DEVICE_LOG + UI2CS_REQ_LOG_RETENTION = 1011 + + # INCOMING ERROR + UI2CS_ERROR = 1999 + + +@unique +class OutboundMessageIDs(RootEnum): + # REGISTRATION + CS2UI_REQ_DEVICE_INFO = 2002 + CS2UI_REQ_SAVE_CREDENTIALS = 2003 + CS2UI_REQ_CHECKIN = 2004 + CS2UI_REQ_FACTORY_RESET = 2005 + + # OPERATION + CS2UI_REQ_DEVICE_STATE = 2006 + CS2UI_REQ_TX_CODE_DISPLAY = 2008 + CS2UI_REQ_DEVICE_DECOMMISSIONED = 2009 + CS2UI_DEVICE_LOG_UPLOADED = 2010 + CS2UI_REQ_LOG_RETENTION = 2011 + + # OUTGOING ERROR + CS2UI_ERROR = 2999 + + +@unique +class NetworkRequestType(RootEnum): + MFT2CS_REQ_SET_CREDENTIALS = 4 + MFT2CS_REQ_INIT_CONNECTIVITY_TEST = 5 + MFT2CS_REQ_FACTORY_RESET = 13 + CS2MFT_REQ_REGISTRATION = 101 + CS2DCS_REQ_SET_DEVICE_STATE = 201 + CS2DCS_REQ_SEND_TREATMENT_REPORT = 202 + CS2DCS_REQ_SEND_DEVICE_LOG = 203 + CS2DCS_REQ_SEND_CS_LOG = 204 + + +@unique +class ErrorIDs(RootEnum): + GENERIC_ERROR = 900 + CS_REQ_REGISTRATION_ERROR = 901 + # 902, 903, 904, 905 + + CS_SEND_DEVICE_STATE_ERROR = 906 + CS_SEND_TREATMENT_REPORT_ERROR = 907 + CS_REQ_CHECKIN_ERROR = 908 + CS_REQ_DECOMMISSION_ERROR = 909 + CS_BAD_CRC_ERROR = 910 + # 911, 912, 913, 914, 915, 916, 917, 918, 919 + + CS_DEVICE_VALIDATION_RESULT_ERROR = 920 + CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR = 921 + CS_GET_NEW_TOKEN_WITH_CERT_ERROR = 922 + CS_VERIFY_TOKEN_ERROR = 923 + CS_VALIDATE_DEVICE_ERROR = 924 + CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR = 925 + CS_CREATE_TEMPORARY_PATIENT_ERROR = 926 + CS_SAVE_CREDENTIALS_ERROR = 927 + CS_UNKNOWN_DEVICE_STATE_ERROR = 928 + CS_SAVE_CONFIG_ERROR = 929 + CS_DEVICE_LOG_ERROR = 930 + CS_LOG_ERROR = 931 + CS_FACTORY_RESET_ERROR = 932 + CS_LOG_RETENTION_ERROR = 933 + + +@unique +class LogUploadReasonCode(RootEnum): + DISCONNECTED = 1 # Backup & retry + CREDENTIAL = 2 # Backup & NO upload + DUPLICATE = 3 # Backup & rename Index: config/config_STAGING.json =================================================================== diff -u --- config/config_STAGING.json (revision 0) +++ config/config_STAGING.json (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,29 @@ +{ + "kebormed_paas": { + "idp_client_secret": "NL2cn6eMyg2WLSB0nhfvbxvM79dvo3ta", + "url_mft": "", + "url_dcs": "https://device-api.diality.staging.kebormed.com", + "url_device_identity": "https://device-identity.diality.staging.kebormed.com/auth/realms/Main/protocol/openid-connect/token", + "url_reachability": "https://healthcheck.diality.staging.kebormed.com/", + "dia_org_id": 1 + }, + "device": { + "ip": "", + "port": 80, + "name": "", + "hd_serial": "", + "dg_serial": "", + "sw_version": "", + "mode": "registration", + "device_state": "INACTIVE_NOT_OK" + }, + "logs": { + "default_log_level": "ERROR", + "default_log_level_duration": "86400000", + "current_log_level": "", + "log_level_duration": 0, + "log_level_start_timestamp": 0, + "log_level_stop_timestamp": 0, + "update_dcs_flag": 0 + } +} \ No newline at end of file Index: config/config_integration.json =================================================================== diff -u --- config/config_integration.json (revision 0) +++ config/config_integration.json (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,29 @@ +{ + "kebormed_paas": { + "idp_client_secret": "mock-client-secret", + "url_mft": "http://mock-drt:9090", + "url_dcs": "http://mock-dcs:8080", + "url_device_identity": "http://mock-dcs:8080/auth/realms/Main/protocol/openid-connect/token", + "url_reachability": "http://mock-dcs:8080/health", + "dia_org_id": 1 + }, + "device": { + "ip": "172.18.0.3", + "port": 5000, + "name": "HD_TEST_1770675061356", + "hd_serial": "HD_TEST_1770675061356", + "dg_serial": "DG_TEST_1770675061356", + "sw_version": "0.5.0_test", + "mode": "registration", + "device_state": 4 + }, + "logs": { + "default_log_level": "DEBUG", + "default_log_level_duration": "86400000", + "current_log_level": "DEBUG", + "log_level_duration": 0, + "log_level_start_timestamp": 0, + "log_level_stop_timestamp": 0, + "update_dcs_flag": 0 + } +} \ No newline at end of file Index: config/config_quality.json =================================================================== diff -u --- config/config_quality.json (revision 0) +++ config/config_quality.json (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,29 @@ +{ + "kebormed_paas": { + "idp_client_secret": "2F3cOmEvAe9NuTr63DaHwSXtIl0Njtx0", + "url_mft": "", + "url_dcs": "https://device-api.diality.qa.kebormed.com", + "url_device_identity": "https://device-identity.diality.qa.kebormed.com/auth/realms/Main/protocol/openid-connect/token", + "url_reachability": "https://healthcheck.diality.qa.kebormed.com/", + "dia_org_id": 1 + }, + "device": { + "ip": "", + "port": 80, + "name": "", + "hd_serial": "", + "dg_serial": "", + "sw_version": "", + "mode": "registration", + "device_state": "INACTIVE_NOT_OK" + }, + "logs": { + "default_log_level": "ERROR", + "default_log_level_duration": "86400000", + "current_log_level": "", + "log_level_duration": 0, + "log_level_start_timestamp": 0, + "log_level_stop_timestamp": 0, + "update_dcs_flag": 0 + } +} Index: config/log_upload_template.json =================================================================== diff -u --- config/log_upload_template.json (revision 0) +++ config/log_upload_template.json (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,33 @@ +{ + "start_session": { + "generatedAt": 0, + "dataType": "", + "serialNumber": "", + "macAddress": "", + "organizationId": "", + "reference": "", + "metadata": { + "dataType" : "", + "deviceLogType": "", + "deviceSubType": "", + "deviceFileName": "", + "startTimestamp": "", + "endTimestamp": "" + } + }, + "upload_chunk": { + "sessionId": "", + "chunkNo": 0, + "chunkType": "", + "data": "" + }, + "end_session": { + "sessionId": "", + "checksum": "", + "completedAt": 0 + }, + "general": { + "file_size": 0, + "file_path": "" + } +} \ No newline at end of file Index: config/treatment_report_template.json =================================================================== diff -u --- config/treatment_report_template.json (revision 0) +++ config/treatment_report_template.json (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,69 @@ +{ + "organizationId": "", + "organizationUserId": "", + "serialNumber": "", + "checksum": "", + "reference": "", + "data": { + "patient": { + "id": "" + }, + "treatment": { + "treatmentCode": "", + "parameters": { + "bloodFlowRate": 0, + "dialysateFlowRate": 0, + "treatmentDuration": 0, + "dialysate": { + "acidCode": "", + "bicarbCode": "", + "K": 0, + "Ca": 0, + "HCO3": 0, + "Na": 0 + }, + "dialysateTemp": 0, + "dialyzerModel": "", + "heparinType": "", + "heparinConcentration": "", + "heparinBolus": "", + "heparinRate": "", + "heparinStopBeforeTreatmentEnd": "" + }, + "time": { + "start": 0, + "end": 0, + "treatmentDuration": 0, + "targetWeight": "" + }, + "data": [], + "events": [], + "alarms": [], + "treatmentPreparation":{ + "totalChlorine":"", + "ph":"", + "conductivity":"" + } + }, + "deviceTreatmentData": { + "dialysateVolumeUsed": 0, + "prescribedUltrafiltrationVolume": 0, + "finalTargetUltrafiltrationVolume": 0, + "actualUltrafiltrationVolume": 0, + "prescribedUltrafiltrationRate": 0, + "finalTargetUltrafiltrationRate": 0, + "actualUltrafiltrationRate": 0, + "salineBolusVolumeGiven": 0, + "heparinDelivered": "" + }, + "diagnostics": { + "waterSampleTestResult": "", + "machineWipedDown": "", + "filterLife": "", + "lastChemicalDisinfection": "", + "lastHeatDisinfection": "" + } + }, + "completedAt": 0, + "generatedAt": 0 +} \ No newline at end of file Index: handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u --- handlers/cs_mft_dcs_request_handler.py (revision 0) +++ handlers/cs_mft_dcs_request_handler.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,559 @@ +"""Handler of network requests between CloudSync, Device Registration Tool and Diality Cloud System""" + +from time import time + +from logging import Logger +from threading import Event, Thread +from collections import deque +from random import seed, randint + +from cloudsync.handlers.network_request import NetworkRequest +from cloudsync.handlers.error_handler import ErrorHandler +from cloudsync.handlers.error import Error +from cloudsync.utils.helpers import log_func +from cloudsync.utils.logging import LoggingConfig +from cloudsync.utils.globals import * +from cloudsync.common.enums import * +from cloudsync.handlers.outgoing.handler_cs_to_mft import * +from cloudsync.handlers.outgoing.handler_cs_to_dcs import * +from cloudsync.handlers.incoming.handler_mft_to_cs import * + +class NetworkRequestHandler: + def __init__(self, logger: Logger, max_size, output_channel, reachability_provider, error_handler): + self.logger = logger + self.logconf = LoggingConfig() + self.reachability_provider = reachability_provider + self.logger.info('Created Network Request Handler') + self.output_channel = output_channel + self.error_handler = error_handler + self.queue = deque(maxlen=max_size) # Thread safe + self.thread = Thread(target=self.scheduler, daemon=True) + self.event = Event() + self.thread.start() + + def scheduler(self) -> None: + """ + Continuously monitors the event flag to check for new requests + :return: None + """ + while True: + flag = self.event.wait() + if flag: + req = self.queue.popleft() + self.handle_request(req) + if len(self.queue) == 0: + self.event.clear() + + def enqueue_request(self, req: NetworkRequest) -> bool: + """ + :param req: (Tuple) the data to add to the queue + :return: True upon success, False otherwise + """ + if len(self.queue) < self.queue.maxlen: + self.queue.append(req) + self.event.set() + return True + else: + return False + + def handle_request(self, req: NetworkRequest) -> None: + """ + Called on each timer event. Logs the data currently in deque + :return: None + """ + + # REGISTRATION MODE + if req.request_type == NetworkRequestType.CS2MFT_REQ_REGISTRATION: + try: + response = cmd_outgoing_register(device_name=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_NAME], + hd_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + dg_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], + sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + manf_tool_base_url=req.g_config[CONFIG_KEBORMED][ + CONFIG_KEBORMED_MFT_URL], + data_source_id=req.g_config[CONFIG_KEBORMED][ + CONFIG_KEBORMED_DIA_ORG_ID], + headers=req.headers, + error_handler=self.error_handler) + self.logger.debug("DRT Request registration resp: {0}".format(response)) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + elif req.request_type == NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS: + try: + certificate = req.payload.get("certificate") + private_key = req.payload.get("private_key") + public_key = req.payload.get("public_key") + cmd_incoming_set_credentials(certificate=certificate, + private_key=private_key, + public_key=public_key, + output_channel=self.output_channel, + error_handler=self.error_handler) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + elif req.request_type == NetworkRequestType.MFT2CS_REQ_INIT_CONNECTIVITY_TEST: + try: + url_validate = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] + url_validate = urllib.parse.urljoin(url_validate, "api/device/validate") + cmd_incoming_initiate_connectivity_test( + url_token=req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL], + url_validate=url_validate, + hd_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + dg_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], + sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + g_config=req.g_config, + error_handler=self.error_handler + ) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + elif req.request_type == NetworkRequestType.MFT2CS_REQ_FACTORY_RESET: + try: + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] = 'operation' + helpers_write_config(None, CONFIG_PATH, req.g_config) + helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, req.g_config) + cmd_incoming_factory_reset(output_channel=self.output_channel) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_FACTORY_RESET_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] = 'registration' + helpers_write_config(None, CONFIG_PATH, req.g_config) + + + # OPERATION MODE + elif req.request_type == NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE: + try: + device_state_json = req.payload # empty {} + device_state = req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] + base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] + identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] + client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) + device_state_url = urllib.parse.urljoin(base_url, "/api/device") + + access_token = self.get_valid_token(identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret) + + if access_token is not None: + + # Decide whether to inform DCS for the new log level + if helpers_should_update_dcs_log_level(req.g_config): + device_state_json['logLevel'] = self.logconf.get_log_level()['text_value'].lower() + self.logconf.set_dcs_flag(0) # Reset the update_dcs_flag + + device_state_json['state'] = DeviceStates.mapped_int_value(device_state).value + + device_state_json = json.dumps(device_state_json) + + response = cmd_outgoing_set_device_state(url=device_state_url, + access_token=access_token, + device_state_json=device_state_json, + error_handler=self.error_handler) + + if response is None: + return + + if response.status_code == 200: + self.logger.debug("CS set Device state: {0}".format(response.json())) + + if helpers_should_update_cs_log_level(response.json(), req.g_config): + new_log_level = response.json()['logLevel'] + if new_log_level != "error": + # Set the DCS flag + self.logconf.set_dcs_flag(1) + # Calculate the duration of the new log level + log_level_duration = helpers_calculate_log_level_duration(response.json()) + log_level_duration = log_level_duration if log_level_duration else req.g_config[CONFIG_DEVICE][CONFIG_LOGS_LOG_LEVEL_DURATION] + self.logconf.set_log_duration(log_level_duration) + # Update the log level + self.logconf.set_log_level(new_log_level) + # Start the timer + self.logconf.start_countdown() + else: + self.logconf.revert_to_error() + + self.logger.debug("DCS Request device state response code: {0} & full response: {1}".format(response.status_code, response.text)) + + if response.status_code == UNASSIGNED: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + "Invalid device state transition") + self.error_handler.enqueue_error(error=error) + else: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + "Missing access token") + self.error_handler.enqueue_error(error=error) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT: + try: + treatment_log_json = req.payload + patient_emr_id = treatment_log_json['data']['patient']['id'] + treatment_id = treatment_log_json['data']['treatment']['treatmentCode'] + + base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] + identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] + client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) + validate_url = urllib.parse.urljoin(base_url, "api/device/validate") + data_submission_url = urllib.parse.urljoin(base_url, "/api/device/data") + + access_token = self.get_valid_token(identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret) + + if access_token is None: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + "Missing access token") + self.error_handler.enqueue_error(error=error) + return + + # Step #1 - get organization id for current device + + response = cmd_outgoing_validate_device(access_token=access_token, + hd_serial_number=req.g_config[CONFIG_DEVICE][ + CONFIG_DEVICE_HD_SERIAL], + dg_serial_number=req.g_config[CONFIG_DEVICE][ + CONFIG_DEVICE_DG_SERIAL], + sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + url=validate_url, + error_handler=self.error_handler) + + if response is None: + return + + invalid_attributes = response.get("invalidAttributes", None) + + if len(invalid_attributes) > 0: + self.logger.error( + "Device with HD serial number {0}, DG serial number {1}, software version {2} is not a valid " + "device for treatment. Invalid fields: {3}".format( + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + invalid_attributes)) + return response + else: + organization_id = response.get("associatedOrganizationId", None) + treatment_log_json['organizationId'] = organization_id + + # Step #2 - Set patient by patient_emr_id + + patient_with_emr_id_exists_url = urllib.parse.urljoin(base_url, + "api/device/organization/{0}/patient/{1}".format( + organization_id, patient_emr_id)) + create_temporary_patient_url = urllib.parse.urljoin(base_url, + "api/device/organization/{0}/patient/{1}".format( + organization_id, patient_emr_id)) + + # Step #2a - Check if patient exists by patient_emr_id + + response = cmd_outgoing_check_if_patient_with_emr_id_exists(access_token=access_token, + url=patient_with_emr_id_exists_url, + error_handler=self.error_handler) + + # Step #2b - If patient with emr_id doesn't exist, create temporary patient + + if response is None: + return + + if response.status_code == OK: + patient_id = response.json().get("id", None) + elif response.status_code == NOT_FOUND: + response = cmd_outgoing_create_temporary_patient(access_token=access_token, + url=create_temporary_patient_url, + error_handler=self.error_handler) + if response is None: + return + patient_id = response.get("id", None) + else: + g_utils.logger.warning("Patient didn't exist and a temporary patient couldn't be created") + return response + + patient_device_association_url = urllib.parse.urljoin(base_url, + "/api/device/organization/{0}/patient/{1}/association".format( + organization_id, patient_id)) + # Step #3 - Check if device is associated to patient and if not associate it + + response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, + access_token=access_token, + associate=True, + error_handler=self.error_handler) + # Step #4 - Send treatment report + + association_time = int(round(time() * S_MS_CONVERSION_FACTOR)) + # if response.status_code == SUCCESS: + # association_time = int(response.headers.get('X-Timestamp')) + # generated_at_time = association_time + 1000 + # completed_at_time = association_time + 2000 + # else: + # TEMPORARY DELAY TO COMPENSATE FOR DEVICE INTERNAL CLOCK ERROR - TODO REMOVE AFTER FIX + generated_at_time = association_time + 301000 + # TEMPORARY DELAY TO COMPENSATE FOR DEVICE INTERNAL CLOCK ERROR - TODO REMOVE AFTER FIX + completed_at_time = association_time + 302000 + + treatment_log_json['organizationUserId'] = patient_id + + treatment_log_json['generatedAt'] = generated_at_time + treatment_log_json['completedAt'] = completed_at_time + + treatment_log_json = json.dumps(treatment_log_json) + + self.logger.debug("treatment log: {0}".format(treatment_log_json)) + + self.logger.info("Sending treatment report to DCS") + response = cmd_outgoing_send_treatment_report(url=data_submission_url, + access_token=access_token, + treatment_log=treatment_log_json, + error_handler=self.error_handler) + + if response is None: + return + + self.logger.info(f"Treatment upload response: {response.status_code}") + if response.status_code != OK: + self.logger.error(f"Treatment upload failed: {response.status_code} - {response.text[:500]}") + self.logger.debug( + "Treatment upload response body: {0}".format(response.json())) + + if response.status_code == OK: + # Send TX code to UI app + message_body = str( + OutboundMessageIDs.CS2UI_REQ_TX_CODE_DISPLAY.value) + ',1,' + treatment_id + self.output_channel.enqueue_message(message_body) + else: + g_utils.logger.warning("Treatment {0} upload failed: {1}".format( + treatment_id, response.json())) + + # treatment_id = response.json().get("reference", None) + + # Step #5 - Remove patient/device association + + response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, + access_token=access_token, + associate=False, + error_handler=self.error_handler) + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG: + try: + device_log_data = req.payload + base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] + identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] + client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + token_verification_url = urllib.parse.urljoin( + base_url, DEVICE_TOKEN_VALIDATION) + validate_url = urllib.parse.urljoin( + base_url, "api/device/validate") + + # Step #1 - get access token + + access_token = self.get_valid_token(identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret) + + if access_token is not None: + + # Step #2 - get organization id for current device + + response = cmd_outgoing_validate_device(access_token=access_token, + hd_serial_number=req.g_config[CONFIG_DEVICE][ + CONFIG_DEVICE_HD_SERIAL], + dg_serial_number=req.g_config[CONFIG_DEVICE][ + CONFIG_DEVICE_DG_SERIAL], + sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + url=validate_url, + error_handler=self.error_handler) + + if response is None: + return + + invalid_attributes = response.get("invalidAttributes", None) + + if len(invalid_attributes) > 0: + self.logger.error( + "Device with HD serial number {0}, DG serial number {1}, software version {2} is not a valid " + "device for log upload. Invalid fields: {3}".format( + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + invalid_attributes)) + return response + else: + organization_id = response.get("associatedOrganizationId", None) + device_log_data['organizationId'] = organization_id + + # Step #3 - upload the device log file in chunks + + device_log_json = helpers_construct_device_log_json(device_log_data) + + upload_result = cmd_outgoing_upload_file_in_chunks(base_url=base_url, + access_token=access_token, + file_json=device_log_json, + error_handler=self.error_handler, + log_file_origin='device', + token_refresher=lambda: self.get_valid_token( + identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret)) + + if isinstance(upload_result, dict) and not upload_result.get("accepted"): + # Rejection (e.g. 409 duplicate) — send 2010 with 3 params: filename, 0, reason_code + message_body = "{0},3,{1},0,{2}".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + upload_result["filename"], + upload_result["reason_code"]) + self.output_channel.enqueue_message(message_body) + elif isinstance(upload_result, str): + # Success — send 2010 with 2 params: filename, 1 + self.logger.debug("Device log file uploaded: {upload_result}") + message_body = "{0},2,{1},1".format( + OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, + upload_result) + self.output_channel.enqueue_message(message_body) + else: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + "Missing access token") + self.error_handler.enqueue_error(error=error) + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG: + try: + cs_log_data = req.payload + base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] + identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] + client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + token_verification_url = urllib.parse.urljoin( + base_url, DEVICE_TOKEN_VALIDATION) + validate_url = urllib.parse.urljoin( + base_url, "api/device/validate") + + # Step #1 - get access token + + access_token = self.get_valid_token(identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret) + + if access_token is not None: + + # Step #2 - get organization id for current device + + response = cmd_outgoing_validate_device(access_token=access_token, + hd_serial_number=req.g_config[CONFIG_DEVICE][ + CONFIG_DEVICE_HD_SERIAL], + dg_serial_number=req.g_config[CONFIG_DEVICE][ + CONFIG_DEVICE_DG_SERIAL], + sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + url=validate_url, + error_handler=self.error_handler) + + if response is None: + return + + invalid_attributes = response.get("invalidAttributes", None) + + if len(invalid_attributes) > 0: + self.logger.error( + "Device with HD serial number {0}, DG serial number {1}, software version {2} is not a valid " + "device for log upload. Invalid fields: {3}".format( + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], + req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], + invalid_attributes)) + return response + else: + organization_id = response.get("associatedOrganizationId", None) + cs_log_data['organizationId'] = organization_id + + # Step #3 - upload the cs log file + + cs_log_json = helpers_construct_cs_log_json(cs_log_data) + + cs_log_filename = cmd_outgoing_upload_file_in_chunks(base_url=base_url, + access_token=access_token, + file_json=cs_log_json, + error_handler=self.error_handler, + log_file_origin='cs', + token_refresher=lambda: self.get_valid_token( + identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret)) + + if cs_log_filename is not None: + self.logger.debug("CS log file uploaded: {cs_log_filename}") + + else: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_ERROR.value, + "Missing access token") + self.error_handler.enqueue_error(error=error) + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + else: + g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) + + def get_valid_token(self, identity_url, token_verification_url, client_secret): + access_token = helpers_get_stored_token() + + if access_token is None: + self.logger.info("No stored token found, requesting new token") + access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, + path_private_key=CREDENTIALS_PRIVATE_KEY, + save=True, + url=identity_url, + client_secret=client_secret, + error_handler=self.error_handler) + else: + response = cmd_outgoing_verify_token(url=token_verification_url, + access_token=access_token, + error_handler=self.error_handler) + if response is None or response.status_code != OK: + self.logger.warning(f"Token verification failed (status={response.status_code if response else 'None'}), refreshing token") + access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, + path_private_key=CREDENTIALS_PRIVATE_KEY, + save=True, + url=identity_url, + client_secret=client_secret, + error_handler=self.error_handler) + else: + self.logger.info("Token verification succeeded") + + return access_token + + def wait_for_network(self, wait_time): + counter = 0 + while not self.reachability_provider.reachability and counter < wait_time: + counter += 1 + sleep(1) + if counter == wait_time: + return False + else: + return True Index: handlers/error.py =================================================================== diff -u --- handlers/error.py (revision 0) +++ handlers/error.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,59 @@ +import datetime + + +class Error: + + def __init__(self, error_body: str): + + error_components = error_body.split(',') + if len(error_components) >= 3: + self.timestamp = datetime.datetime.now().timestamp() + self.ID = error_components[0] + self.size = error_components[1] + self.code = error_components[2] + if int(error_components[1]) > 1: + self.parameters = error_components[3:] + else: + self.parameters = [] + else: + self.timestamp = "0" + self.ID = "0" + self.size = "0" + self.code = "0" + self.parameters = [] + + def __str__(self): + data = { + "timestamp": self.timestamp, + "ID": self.ID, + "size": self.size, + "code": self.code, + "parameters": self.parameters + } + return str(data) + + @classmethod + def timeout(cls, message_id, error_id, detail="Registration timeout"): + return cls("{0},2,{1},{2}".format(message_id, error_id, detail)) + + @classmethod + def redirect(cls, message_id, error_id): + return cls("{0},2,{1},Too many redirects".format(message_id, error_id)) + + @classmethod + def general(cls, message_id, error_id, detail): + return cls("{0},2,{1},{2}".format(message_id, error_id, detail)) + + @classmethod + def validation(cls, message_id, error_id, status_code, reason, detail): + return cls("{0},3,{1},{2}:{3},Exception: {4}".format( + message_id, error_id, status_code, reason, detail)) + + @classmethod + def file_not_found(cls, message_id, error_id): + return cls("{0},2,{1},UI logfile not found".format(message_id, error_id)) + + @classmethod + def crc_mismatch(cls, message_id, error_id, sequence, expected_crc, actual_crc): + return cls("{0},4,{1},Bad CRC on message {2},Message CRC: {3},Calculated CRC: {4}".format( + message_id, error_id, sequence, expected_crc, actual_crc)) Index: handlers/error_handler.py =================================================================== diff -u --- handlers/error_handler.py (revision 0) +++ handlers/error_handler.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,63 @@ +from time import sleep +from logging import Logger +from threading import Event, Thread +from collections import deque + +from cloudsync.handlers.error import Error +from cloudsync.common.enums import * +from cloudsync.utils.globals import * +from cloudsync.utils.helpers import * + + +class ErrorHandler: + def __init__(self, logger: Logger, max_size, output_channel): + self.logger = logger + self.output_channel = output_channel + self.queue = deque(maxlen=max_size) # Thread safe + self.thread = Thread(target=self.scheduler, daemon=True) + self.event = Event() + self.thread.start() + + self.logger.info('Created Error Handler') + + def scheduler(self) -> None: + """ + Continuously monitors the event flag to check for new errors + :return: None + """ + while True: + flag = self.event.wait() + if flag: + while len(self.queue) > 0: + error = self.queue.popleft() + self.handle_error(error) + self.event.clear() + + def enqueue_error(self, error: Error) -> bool: + """ + :param error: the error to add to the queue + :return: True upon success, False otherwise + """ + if len(self.queue) < self.queue.maxlen: + self.queue.append(error) + self.event.set() + return True + else: + return False + + def handle_error(self, error: Error): + if InboundMessageIDs.mapped_str_value(error.ID) == InboundMessageIDs.UI2CS_ERROR: + self.logger.error('UI App Error {0}: {1}'.format(error.code, error.parameters)) + return # TODO Add specific UI error message handling when necessary + + if OutboundMessageIDs.mapped_str_value(error.ID) == OutboundMessageIDs.CS2UI_ERROR: + self.logger.error('CS App Error {0} - {1}: {2}'.format(error.code, + ErrorIDs.mapped_str_value(error.code), + error.parameters)) + parameter_string = "" + for parameter in error.parameters: + parameter_string += "," + parameter_string += parameter + + message_body = str(OutboundMessageIDs.CS2UI_ERROR.value) + ',' + error.size + ',' + error.code + parameter_string + self.output_channel.enqueue_message(message_body) Index: handlers/incoming/handler_dcs_to_cs.py =================================================================== diff -u --- handlers/incoming/handler_dcs_to_cs.py (revision 0) +++ handlers/incoming/handler_dcs_to_cs.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1 @@ +# This file is a placeholder for now. It may be removed if it is not needed \ No newline at end of file Index: handlers/incoming/handler_mft_to_cs.py =================================================================== diff -u --- handlers/incoming/handler_mft_to_cs.py (revision 0) +++ handlers/incoming/handler_mft_to_cs.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,136 @@ +"""Handler of commands received by CloudSync app from the Device Registration Tool""" + +import os + +from cloudsync.handlers.outgoing import * +from cloudsync.utils.helpers import * +from cloudsync.utils.globals import * +from cloudsync.utils.filesystem import check_writable, check_disk_space_mb +from cloudsync.common.enums import * +from cloudsync.handlers.outgoing.handler_cs_to_mft import * +from cloudsync.handlers.outgoing.handler_cs_to_dcs import * + + +@log_func +def cmd_incoming_set_credentials(certificate: str, + private_key: str, + public_key: str, + output_channel, + error_handler: ErrorHandler) -> None: + """ + Sets the credentials on the device + :param certificate: The X.509 certificate + :param private_key: The private key + :param public_key: The public key + :param output_channel: CS_UI output channel + :param error_handler: global error handler + :return: None + """ + try: + if not os.path.exists(CREDENTIALS_PATH): + os.makedirs(CREDENTIALS_PATH) + + if not check_writable(CREDENTIALS_PATH): + raise IOError("Credentials path is not writable: {}".format(CREDENTIALS_PATH)) + if not check_disk_space_mb(CREDENTIALS_PATH, required_mb=1): + raise IOError("Insufficient disk space at: {}".format(CREDENTIALS_PATH)) + + with open(CREDENTIALS_CERTIFICATE_X509, 'w') as f: + f.write(certificate) + + with open(CREDENTIALS_PRIVATE_KEY, 'w') as f: + f.write(private_key) + + with open(CREDENTIALS_PUBLIC_KEY, 'w') as f: + f.write(public_key) + + message_body = str( + OutboundMessageIDs.CS2UI_REQ_SAVE_CREDENTIALS.value) + ',3,' + CREDENTIALS_CERTIFICATE_X509 + ',' + CREDENTIALS_PRIVATE_KEY + ',' + CREDENTIALS_PUBLIC_KEY + output_channel.enqueue_message(message_body) + + except IOError as er: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, + "Error writing device credentials") + error_handler.enqueue_error(error=error) + g_utils.logger.error('Error writing device credentials: {0}'.format(' '.join(er.args))) + + +@log_func +def cmd_incoming_initiate_connectivity_test(url_token: str, + url_validate: str, + hd_serial: str, + dg_serial: str, + sw_version: str, + g_config: dict, + error_handler: ErrorHandler) -> None: + """ + Initiates the connectivity test on the device: + - verifies if device can connect to DCS + - verifies if device can obtain a valid token + - verifies if device information (hd serial, dg serial & software version) matches the cloud values + :param url_token: Identity Provider URL + :param url_validate: Device connectivity test URL + :param hd_serial: The hd serial number + :param dg_serial: The dg serial number + :param sw_version: The software version + :param g_config: The configuration dictionary + :param error_handler: global error handler + :return: None + """ + try: + client_secret = g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] + access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, + path_private_key=CREDENTIALS_PRIVATE_KEY, + save=True, + url=url_token, + client_secret=client_secret, + error_handler=error_handler) + if access_token is not None: + g_utils.logger.debug("Access token: {0}".format(access_token)) + response = cmd_outgoing_validate_device(access_token=access_token, + hd_serial_number=hd_serial, + dg_serial_number=dg_serial, + sw_version=sw_version, + url=url_validate, + error_handler=error_handler) + if response is not None: + g_utils.logger.debug("Response: {0}".format(response)) + invalid_attributes = response.get("invalidAttributes", None) + g_utils.logger.info("Invalid fields: {0}".format(invalid_attributes)) + + if invalid_attributes is not None: + json_resp = cmd_outgoing_validation_result(invalid_attributes, hd_serial, g_config, error_handler) + g_utils.logger.debug("Validation result request response (DRT --> CS): {0}".format(json_resp)) + else: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + "Validation failed due to invalid DCS response format") + error_handler.enqueue_error(error=error) + else: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + "Validation failed due to missing response from DCS") + error_handler.enqueue_error(error=error) + else: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + "Validation failed due to missing token") + error_handler.enqueue_error(error=error) + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + + +@log_func +def cmd_incoming_factory_reset(output_channel) -> None: + """ + Initiates a factory reset on the device + :param output_channel: CS2UI output channel + :return: None + """ + message_body = str(OutboundMessageIDs.CS2UI_REQ_FACTORY_RESET.value) + ',0' + output_channel.enqueue_message(message_body) Index: handlers/logs_handler.py =================================================================== diff -u --- handlers/logs_handler.py (revision 0) +++ handlers/logs_handler.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,74 @@ +from cloudsync.handlers.error import Error +from cloudsync.utils.helpers import * +from cloudsync.utils.globals import * + +from logging.handlers import TimedRotatingFileHandler + +class CustomTimedRotatingFileHandlerHandler(TimedRotatingFileHandler): + """ + Handler for logging to a file, rotating the log file and uploading it to cloud at certain timed intervals. + It extends the official TimedRotatingFileHandler to add the upload functionality. + """ + + def __init__(self, filename, prelogger: Logger, *args, **kwargs): + super().__init__(filename, *args, **kwargs) + self.network_request_handler = None + self.error_handler = None + self.g_config = None + prelogger.info("CUSTOM LOG HANDLER INITIATED") + + def doRollover(self): + super().doRollover() + self.__select_files_to_upload() + + def set_network_provider(self, network_request_handler): + self.network_request_handler = network_request_handler + + def set_error_provider(self, error_handler): + self.error_handler = error_handler + + def set_configuration(self, g_config): + self.g_config = g_config + + def __select_files_to_upload(cls): + """ + Checks for existing rotated files in the directory (excluding the newly created one) and uploads them. + """ + # We use the cls.baseFilename because according to code documentation: + # the filename passed in, could be a path object (see Issue #27493), + # but cls.baseFilename will be a string + + existing_files = [] + base_name = os.path.basename(cls.baseFilename) + for filename in os.listdir(os.path.dirname(cls.baseFilename)): + if filename.startswith(base_name) and filename != base_name: + existing_files.append(os.path.join(os.path.dirname(cls.baseFilename), filename)) + + if len(existing_files) > 0: + for existing_file in existing_files: + cls.__upload_cs_log_file(existing_file) + + def __upload_cs_log_file(cls, log_file_path): + + cs_log_data = { + "path": log_file_path, + "serialNumber": cls.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + "checksum": helpers_sha256_checksum(log_file_path) + } + + g_utils.logger.debug("CS log data: {cs_log_data}") + + try: + + helpers_add_to_network_queue(network_request_handler=cls.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG, + url='', + payload=cs_log_data, + method='', + g_config=cls.g_config, + success_message='CS2DCS_REQ_SEND_CS_LOG request added to network queue') + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_ERROR.value, + str(e)) + cls.error_handler.enqueue_error(error=error) \ No newline at end of file Index: handlers/network_request.py =================================================================== diff -u --- handlers/network_request.py (revision 0) +++ handlers/network_request.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,29 @@ +"""Network Request object""" + +from cloudsync.common.enums import * +from cloudsync.utils.globals import * + + +class NetworkRequest: + + def __init__(self, request_type: NetworkRequestType, url: str, payload: dict, method: str, g_config: dict): + self.request_type = request_type + self.url = url + self.payload = payload + self.method = method + self.g_config = g_config + + self.headers = { + "device_ip": self.g_config[CONFIG_DEVICE][CONFIG_DEVICE_IP], + "device_sn": self.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + } + + def __str__(self): + data = { + "request_type": self.request_type.name, + "url": self.url, + "headers": self.headers, + "payload": self.payload, + "method": self.method, + } + return str(data) Index: handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u --- handlers/outgoing/handler_cs_to_dcs.py (revision 0) +++ handlers/outgoing/handler_cs_to_dcs.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,635 @@ +"""Handler of commands sent by CloudSync app to the Diality Cloud System""" + +from typing import List, Tuple +import json +import requests +import urllib.parse + +from cloudsync.utils.globals import * +from cloudsync.common.enums import * +from cloudsync.utils.helpers import * +from cloudsync.handlers.error_handler import ErrorHandler +from cloudsync.handlers.error import Error + + +@log_func +def cmd_outgoing_get_new_token_with_cert(path_certificate: str, + path_private_key: str, + save: bool, + url: str, + client_secret: str, + error_handler: ErrorHandler) -> str: + """ + Obtains authentication token with device certificate & private key + + :param path_certificate: The path to the certificate + :param path_private_key: The path to the private key + :param save: If True, save the token + :param url: Identity Provider URL used to request a new token + :param client_secret: Identity Provider client secret for CS app + :param error_handler: global error handler + :return: The new token + """ + try: + payload = { + "grant_type": "password", + "scope": "openid profile", + "client_id": "device-client", + "client_secret": client_secret, + } + + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + cert_paths = (path_certificate, path_private_key) + + g_utils.logger.debug("Making request: {0}, {1}, {2}, {3}".format(url, headers, payload, cert_paths)) + + response = requests.post(url=url, + headers=headers, + data=payload, + cert=cert_paths, + timeout=5) + + data = response.json() + + g_utils.logger.debug("Keycloak response: {0}".format(data)) + + if save: + if not os.path.exists(TOKEN_CACHING_PATH): + os.makedirs(TOKEN_CACHING_PATH) + with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'w') as f: + f.write(json.dumps(data, indent=4)) + + return data.get("access_token", None) + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + "Obtain token request timeout") + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + +@log_func +def cmd_outgoing_verify_token(url: str, + access_token: str, + error_handler: ErrorHandler) -> requests.Response: + try: + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + "X-OrganizationId": '1', + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + + data = { + "hdSerialNumber": "token-validation", + "dgSerialNumber": "token-validation", + "softwareVersion": "token-validation" + } + + resp = requests.post(url=url, + data=json.dumps(data), + headers=headers, + timeout=(30, 60)) + g_utils.logger.info(f"Token verification response: {resp.status_code}") + return resp + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VERIFY_TOKEN_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VERIFY_TOKEN_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VERIFY_TOKEN_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + +@log_func +def cmd_outgoing_validate_device(access_token: str, + hd_serial_number: str, + dg_serial_number: str, + sw_version: str, + url: str, + error_handler: ErrorHandler) -> dict: + """ + Step 8. Validate device + Step 9. Validate device response (list of invalid fields) + :return: The json response + """ + try: + payload = json.dumps({ + "hdSerialNumber": hd_serial_number, + "dgSerialNumber": dg_serial_number, + "softwareVersion": sw_version + }) + + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + + response = requests.post(url=url, + headers=headers, + data=payload, + timeout=(30, 60)) + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + try: + return response.json() + except json.decoder.JSONDecodeError as e: + error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + response.status_code, response.reason, str(e)) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + +# Runtime commands + +@log_func +def cmd_outgoing_set_device_state(url: str, + access_token: str, + device_state_json: dict, + error_handler: ErrorHandler) -> requests.Response: + """ + Updates the backend with the current device state + :param url: set device state URL + :param device_state_json: device state payload + :param access_token: access token + :param error_handler: global error handler + :return: The response + """ + try: + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + payload = device_state_json + resp = requests.put(url=url, + headers=headers, + data=payload, + timeout=(30, 60)) + return resp + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + +@log_func +def cmd_outgoing_check_if_patient_with_emr_id_exists(access_token: str, + url: str, + error_handler: ErrorHandler) -> requests.Response: + try: + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + + response = requests.get(url=url, + headers=headers, + timeout=(30, 60)) + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + try: + return response + except json.decoder.JSONDecodeError as e: + error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, + response.status_code, response.reason, str(e)) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + +@log_func +def cmd_outgoing_create_temporary_patient(access_token: str, + url: str, + error_handler: ErrorHandler): + try: + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + + payload = {} + + response = requests.post(url=url, + headers=headers, + data=payload, + timeout=(30, 60)) + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + try: + return response.json() + except json.decoder.JSONDecodeError as e: + error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, + response.status_code, response.reason, str(e)) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + +@log_func +def cmd_outgoing_set_patient_device_association(url: str, + access_token: str, + associate: bool, + error_handler: ErrorHandler) -> requests.Response: + """ + Sets the status of the device & patient association + :param url: set device state URL + :param access_token: access token + :param associate: status of the device & patient association + :param error_handler: global error handler + :return: The response + """ + try: + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + payload = {} + + if associate: + resp = requests.head(url=urllib.parse.urljoin(url, "/exists"), + headers=headers, + data=payload, + timeout=(30, 60)) + + if resp.status_code == NOT_FOUND: + resp = requests.put(url=url, + headers=headers, + data=payload, + timeout=(30, 60)) + else: + resp = requests.delete(url=url, + headers=headers, + data=payload, + timeout=(30, 60)) + return resp + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + +@log_func +def cmd_outgoing_send_treatment_report(url: str, + access_token: str, + treatment_log: str, + error_handler: ErrorHandler) -> requests.Response: + """ + Sends a treatment report to DCS + :param url: set device state URL + :param access_token: access token + :param treatment_log: treatment report sent to DCS + :param error_handler: global error handler + :return: The response + """ + try: + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + + payload = treatment_log + + resp = requests.post(url=url, + headers=headers, + data=payload, + timeout=(30, 60)) + return resp + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + +@log_func +def cmd_outgoing_upload_file_in_chunks( base_url: str, + access_token: str, + file_json: dict, + error_handler: ErrorHandler, + log_file_origin: str, + chunk_size: int=2 * 1024 * 1024, + retries: int=3, + token_refresher: callable=None ) -> Union[str, None]: + """ + Uploads a large file in chunks using sessions and retries. + + Args: + base_url (str): The base URL of the API. + access_token (str): The access token used for Authorization. + file_json (dict): The populated/constructed `log_upload_template.json` file for a specific file. + error_handler (ErrorHandler): Current `ErrorHandler` instance. + log_file_origin (str): The origin of the log file to be uploaded. Accepted values `device` or `cs` + chunk_size (int, optional): The size of each chunk in bytes. Defaults to 50MB. + retries (int, optional): The number of times to retry failed uploads. Defaults to 3. + + Returns: + str | None: The uploaded file name if succeeded, None otherwise. + """ + + origins = ("cs", "device") + if log_file_origin not in origins: + g_utils.logger.error(f"Wrong log file origin provided.") + return None + + ERROR_ID = ErrorIDs.CS_DEVICE_LOG_ERROR.value if log_file_origin == 'device' else ErrorIDs.CS_LOG_ERROR.value + + # + # Start upload session + # + + start_session_url = base_url.rstrip("/") + "/api/device/data/start-session" + start_session_payload = file_json['start_session'] + start_session_payload = json.dumps(start_session_payload) + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + + g_utils.logger.info(f"Starting upload session for {log_file_origin} log") + g_utils.logger.debug(f"File upload payload (start-session): {start_session_payload}") + + try: + response = requests.post( + url=start_session_url, + headers=headers, + data=start_session_payload, + timeout=(30, 60)) + + g_utils.logger.info(f"Start-session response: {response.status_code}") + if response.status_code != 200: + g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") + raise Exception(f"Error while starting upload session: {response.status_code} - {response.text}") + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + error_handler.enqueue_error(error=error) + return None + + session_id = response.json().get("sessionId") + if not session_id: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, "Missing session ID in response.") + error_handler.enqueue_error(error=error) + return None + + # + # Send file in chunks + # + + try: + target_file = file_json['general']["file_path"] + file_size = file_json['general']["file_size"] + upload_chunk_url = base_url.rstrip("/") + "/api/device/data/chunk" + upload_chunk_payload = file_json['upload_chunk'] + upload_chunk_payload['sessionId'] = session_id + upload_chunk_payload['chunkType'] = "device-data" + chunk_number = 1 + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + + with open(target_file, "rb") as f: + file_content = f.read() + + # Encode the bytes using base64 + base64_string = base64.b64encode(file_content).decode("utf8") + + # Get the total size of the base64 string (in bytes) + total_size = len(base64_string) + + # Calculate the number of chunks + num_chunks = total_size // chunk_size + (total_size % chunk_size > 0) + + for i in range(num_chunks): + start_index = i * chunk_size + end_index = min(start_index + chunk_size, total_size) + chunk = base64_string[start_index:end_index] + + # Retry logic with counter and backoff time + retry_count = 0 + while retry_count < retries: + try: + if type(upload_chunk_payload) is str: + upload_chunk_payload = json.loads(upload_chunk_payload) + + upload_chunk_payload['chunkNo'] = chunk_number + upload_chunk_payload['data'] = chunk + upload_chunk_payload = json.dumps(upload_chunk_payload) + + + g_utils.logger.debug(f"File upload payload (upload-chunk) - chunk No {chunk_number}: {upload_chunk_payload}") + + response = requests.post(upload_chunk_url, + headers=headers, + data=upload_chunk_payload, + timeout=(30, 60)) + + if response.status_code == 200: + chunk_number += 1 + g_utils.logger.info(f"Uploaded chunk {chunk_number} of {num_chunks}") + break # Successful upload, break retry loop + + g_utils.logger.warning(f"Chunk {chunk_number}/{num_chunks} upload failed: {response.status_code} - {response.text[:500]}") + if retry_count < retries: + g_utils.logger.info(f"Retrying chunk upload in 5 seconds (attempt {retry_count + 1}/{retries})...") + sleep(5) # Implement backoff time between retries + retry_count += 1 + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + error_handler.enqueue_error(error=error) + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + error_handler.enqueue_error(error=error) + return None + + # + # End upload session + # + + # Refresh token before end-session to prevent expiry after long chunk uploads + if token_refresher is not None: + g_utils.logger.info("Refreshing token before end-session phase") + refreshed_token = token_refresher() + if refreshed_token is not None: + access_token = refreshed_token + g_utils.logger.info("Token refreshed successfully before end-session") + else: + g_utils.logger.warning("Token refresh returned None, using existing token for end-session") + + end_session_url = base_url.rstrip("/") + "/api/device/data/end-session" + end_session_payload = file_json['end_session'] + end_session_payload['sessionId'] = session_id + end_session_payload['completedAt'] = int(datetime.now(timezone.utc).timestamp()*1000) + headers = { + 'Authorization': BEARER_HOLDER.format(access_token), + 'Content-Type': CONTENT_TYPE, + 'User-Agent': USER_AGENT, + "X-Api-Version": API_VERSION + } + try: + end_session_payload = json.dumps(end_session_payload) + g_utils.logger.info(f"Ending upload session (sessionId={session_id})") + g_utils.logger.debug(f"Device log upload payload (end-session): {end_session_payload}") + response = requests.post(end_session_url, + headers=headers, + data=end_session_payload, + timeout=(30, 60)) + + if response.status_code == CONFLICT: + device_file_name = str(file_json['start_session']['metadata']['deviceFileName']) + g_utils.logger.info(f"File {device_file_name} rejected as duplicate (409 Conflict).") + return {"accepted": False, "filename": device_file_name, "reason_code": LogUploadReasonCode.DUPLICATE.value} + + g_utils.logger.info(f"End-session response: {response.status_code}") + if response.status_code != 200: + g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") + raise Exception(f"Error while ending upload session: {response.status_code} - {response.text}") + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) + error_handler.enqueue_error(error=error) + return None + + g_utils.logger.info(f"File {file_json['start_session']['metadata']['deviceFileName']} uploaded.") + return str(file_json['start_session']['metadata']['deviceFileName']) Index: handlers/outgoing/handler_cs_to_mft.py =================================================================== diff -u --- handlers/outgoing/handler_cs_to_mft.py (revision 0) +++ handlers/outgoing/handler_cs_to_mft.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,130 @@ +"""Handler of commands sent by CloudSync app to the Device Registration Tool""" + +import json +import requests +from time import time +import random +from typing import List +import urllib.parse + +from cloudsync.handlers.error_handler import ErrorHandler +from cloudsync.handlers.error import Error +from cloudsync.utils.globals import * +from cloudsync.common.enums import * +from cloudsync.utils.helpers import * + + +@log_func +def cmd_outgoing_register(device_name: str, + hd_serial: str, + dg_serial: str, + sw_version: str, + manf_tool_base_url: str, + data_source_id: int, + headers: dict, + error_handler: ErrorHandler) -> requests.Response: + """ + Initiates device registration with the device registration tool + Device -> Manufacturing Tool + + :param device_name: the device name + :param hd_serial: the hd serial number + :param dg_serial: the dg serial number + :param sw_version: the device software version + :param manf_tool_base_url: the manufacturing tool base url + :param data_source_id: the data source identification number + :param headers: the header dictionary + :param error_handler: global error handler + :return: requests.Response + """ + try: + url = urllib.parse.urljoin(manf_tool_base_url, "register/") + data = { + "datasourceId": data_source_id, + "name": device_name, + "hdSerialNumber": hd_serial, + "dgSerialNumber": dg_serial, + "softwareVersion": sw_version, + } + resp = requests.post(url=url, + data=data, + headers=headers, + timeout=(30, 60)) + return resp + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None + + +@log_func +def cmd_outgoing_validation_result(invalid_attributes: List[str], + hd_serial: str, + g_config: dict, + error_handler: ErrorHandler): + """ + Sends the validation POST to the manf tool + :param invalid_attributes: The list of invalid attributes + :param hd_serial: The hd serial number + :param g_config: The device config + :param error_handler: global error handler + :return: The json response + """ + + if (CONFIG_KEBORMED not in g_config.keys() or + CONFIG_KEBORMED_MFT_URL not in g_config[CONFIG_KEBORMED]): + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value, + "Manufacturing tool url not found") + error_handler.enqueue_error(error=error) + return None + + try: + url = urllib.parse.urljoin(g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_MFT_URL], + "validation/") + + headers = { + "device_ip": g_config[CONFIG_DEVICE][CONFIG_DEVICE_IP], + "device_sn": hd_serial + } + if len(invalid_attributes) == 0: + data = json.dumps({}) + else: + data = { + "invalidAttributes": invalid_attributes, + } + g_utils.logger.debug("headers: {0}".format(headers)) + g_utils.logger.debug("data: {0}".format(data)) + resp = requests.post(url=url, + data=data, + headers=headers, + timeout=(30, 60)) + return resp.json() + except requests.exceptions.Timeout: + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except requests.exceptions.TooManyRedirects: + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value) + error_handler.enqueue_error(error=error) + return None + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value, + str(e)) + error_handler.enqueue_error(error=error) + return None Index: handlers/ui_cs_request_handler.py =================================================================== diff -u --- handlers/ui_cs_request_handler.py (revision 0) +++ handlers/ui_cs_request_handler.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,306 @@ +from logging import Logger +from threading import Event, Thread +from collections import deque + +from cloudsync.handlers.uics_message import UICSMessage +from cloudsync.handlers.error_handler import ErrorHandler +from cloudsync.handlers.error import Error +from cloudsync.common.enums import * +from cloudsync.utils.globals import * +from cloudsync.utils.helpers import * + +import urllib.parse +import os +import shutil +import time + + +class UICSMessageHandler: + + def __init__(self, logger: Logger, max_size, network_request_handler, output_channel, reachability_provider, + error_handler): + self.logger = logger + self.reachability_provider = reachability_provider + self.network_request_handler = network_request_handler + self.output_channel = output_channel + self.error_handler = error_handler + self.queue = deque(maxlen=max_size) # Thread safe + self.thread = Thread(target=self.scheduler, daemon=True) + self.event = Event() + self.thread.start() + + self.logger.info('Created UI_CS Handler') + + def scheduler(self) -> None: + """ + Continuously monitors the event flag to check for new messages + :return: None + """ + while True: + flag = self.event.wait() + if flag: + while len(self.queue) > 0: + message = self.queue.popleft() + self.handle_message(message) + self.event.clear() + + def enqueue_message(self, message: UICSMessage) -> bool: + """ + :param message: the message to add to the message queue + :return: True upon success, False otherwise + """ + if len(self.queue) < self.queue.maxlen: + self.queue.append(message) + self.event.set() + return True + else: + return False + + def handle_message(self, message: UICSMessage) -> None: + self.logger.info('UI2CS Message: {0}'.format(message)) + + message_data = message.timestamp + message.sequence + message.ID + message.size + if len(message.parameters) > 0: + for param in message.parameters: + message_data += param + else: + self.logger.debug("UI2CS message has 0 parameters: {0}".format(message)) + + message_calculated_crc8 = helpers_crc8(message_data.encode('utf-8')) + + if message.CRC != str(message_calculated_crc8): + error = Error.crc_mismatch(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_BAD_CRC_ERROR.value, + message.sequence, + message.CRC, + str(message_calculated_crc8)) + self.error_handler.enqueue_error(error=error) + else: + # REGISTRATION MODE + + # REGISTRATION REQUEST + if (InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_REGISTRATION) and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration'): + self.logger.info('UI2CS_REQ_REGISTRATION request received') + self.logger.debug('Applicable config: {0}'.format(message.g_config)) + + if (len(message.parameters) != 3) or (message.parameters[0] is None) or (message.parameters[2] is None): + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, + "invalid # of parameters for registration") + self.error_handler.enqueue_error(error=error) + else: + try: + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] = message.parameters[0] + if len(message.parameters[1]) > 0: + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] = message.parameters[1] + else: + self.logger.debug("Used HD Serial as value for DG Serial") + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] = message.parameters[0] + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] = message.parameters[2] + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_NAME] = message.parameters[0] + + device_ip = helpers_get_ip_address() + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_IP] = device_ip + + helpers_write_config(None, CONFIG_PATH, message.g_config) + helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) + + helpers_add_to_network_queue(network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2MFT_REQ_REGISTRATION, + url='', + payload={}, + method='', + g_config=message.g_config, + success_message='CS2MFT_REQ_REGISTRATION request added to network ' + 'queue') + except IOError: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CONFIG_ERROR.value, + "Error updating device config file") + self.error_handler.enqueue_error(error=error) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + + # OPERATION MODE + + # SEND DEVICE STATE REQUEST + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_DEVICE_STATE and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): + self.logger.info("UI2CS_SEND_DEVICE_STATE request received") + self.logger.debug("HD Mode: {0}".format(HDOpModes.mapped_str_value(message.parameters[0]))) + self.logger.debug("HD Sub-Mode: {0}".format(HDOpSubModes.mapped_str_value(message.parameters[1]))) + + try: + device_state = helpers_device_state_to_cloud_state( + HDOpModes.mapped_str_value(message.parameters[0]), + HDOpSubModes.mapped_str_value(message.parameters[1])) + except Exception: + device_state = DeviceStates.UNKNOWN_STATE + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_UNKNOWN_DEVICE_STATE_ERROR.value, + "Unknown device state received from UI") + self.error_handler.enqueue_error(error=error) + + message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] = device_state.value + self.logger.info("Device state: {0}".format(device_state.name)) + + try: + helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) + helpers_add_to_network_queue(network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE, + url='', + payload={}, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network ' + 'queue') + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + + # SEND TREATMENT REPORT REQUEST + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_TREATMENT_REPORT and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): + self.logger.info("UI2CS_SEND_TREATMENT_REPORT request received") + + try: + hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] + sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] + + self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) + + treatment_log_json = helpers_read_treatment_log_file(message.parameters[0]) + if treatment_log_json: + treatment_log_json['checksum'] = helpers_sha256_checksum(json.dumps(treatment_log_json['data'])) + treatment_log_json['serialNumber'] = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + + g_utils.logger.debug("Treatment log {0}".format(treatment_log_json)) + + helpers_add_to_network_queue(network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT, + url='', + payload=treatment_log_json, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_TREATMENT_REPORT request added to network' + 'queue') + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + + # DECOMMISSIONING REQUEST + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_DECOMMISSION and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): + self.logger.info("UI2CS_REQ_DECOMMISSION request received") + + try: + helpers_decommission_device() + message_body = str( + OutboundMessageIDs.CS2UI_REQ_DEVICE_DECOMMISSIONED.value) + ',0' + self.output_channel.enqueue_message(message_body) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_DECOMMISSION_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + + # CHECK-IN REQUEST + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_CHECKIN: + # send the check-in in either registration or operation + self.logger.info("UI2CS_SEND_CHECKIN request received") + + try: + message_body = str( + OutboundMessageIDs.CS2UI_REQ_CHECKIN.value) + ',0' + self.output_channel.enqueue_message(message_body) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_CHECKIN_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + + # CS LOG RETENTION + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_LOG_RETENTION: + self.logger.info("UI2CS_REQ_LOG_RETENTION request received") + if len(message.parameters) >= 1 and int(message.parameters[0]) > 0: + try: + self.logger.info("Used Percentage: {0}".format(message.parameters[0])) + num_of_files, del_size_mb = helpers_log_retention(int(message.parameters[0])) + message_body = str( + OutboundMessageIDs.CS2UI_REQ_LOG_RETENTION.value) + f',{num_of_files}' + f',{del_size_mb}' + self.output_channel.enqueue_message(message_body) + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_RETENTION_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) + else: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_RETENTION_ERROR.value, + "invalid # of parameters for log retention") + self.error_handler.enqueue_error(error=error) + + # ERROR MESSAGE RECEIVED FROM UI + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_ERROR: + error_body = "{0},{1},{2}".format(InboundMessageIDs.UI2CS_ERROR.value, message.size, + message.parameters[0]) + if len(message.parameters) > 1: + for parameter in message.parameters[1:]: + error_body += ",{0}".format(parameter) + error = Error(error_body=error_body) + self.error_handler.enqueue_error(error=error) + + # UPLOAD DEVICE LOG TO CLOUD REQUEST + elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_UPLOAD_DEVICE_LOG and \ + (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): + self.logger.info("UI2CS_UPLOAD_DEVICE_LOG request received") + + if (len(message.parameters) != 2) or (message.parameters[0] is None) or (message.parameters[1] is None): + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + "invalid # of parameters for file upload request") + self.error_handler.enqueue_error(error=error) + else: + try: + + local_checksum = helpers_sha256_checksum(message.parameters[0]) + if local_checksum != message.parameters[1]: + raise ValueError('No valid sha256 value.') + + hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] + dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] + sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] + + self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) + + device_log_data = { + "path": message.parameters[0], + "serialNumber": message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], + "checksum": local_checksum + } + + g_utils.logger.debug("Device log data {0}".format(device_log_data)) + + helpers_add_to_network_queue(network_request_handler=self.network_request_handler, + request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, + url='', + payload=device_log_data, + method='', + g_config=message.g_config, + success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to network queue') + + except Exception as e: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + str(e)) + self.error_handler.enqueue_error(error=error) Index: handlers/uics_message.py =================================================================== diff -u --- handlers/uics_message.py (revision 0) +++ handlers/uics_message.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,36 @@ +"""UICS Message object""" + +class UICSMessage: + + def __init__(self, message_body: str, g_config: dict): + self.g_config = g_config + + message_components = message_body.split(',') + if len(message_components) >= 5: + self.timestamp = message_components[0] + self.sequence = message_components[1] + self.CRC = message_components[2] + self.ID = message_components[3] + self.size = message_components[4] + if int(message_components[4]) > 0: + self.parameters = message_components[5:] + else: + self.parameters = {} + else: + self.timestamp = "0" + self.sequence = "0" + self.CRC = "0" + self.ID = "0" + self.size = "0" + self.parameters = {} + + def __str__(self): + data = { + "timestamp": self.timestamp, + "sequence": self.sequence, + "CRC": self.CRC, + "ID": self.ID, + "number_of_parameters": self.size, + "parameters": self.parameters, + } + return str(data) Index: utils/filesystem.py =================================================================== diff -u --- utils/filesystem.py (revision 0) +++ utils/filesystem.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,45 @@ +"""Filesystem health check utilities for pre-flight I/O validation.""" + +import os +import shutil + + +def check_readable(path): + """Check if a file or directory is readable. + + :param str path: Path to check + :return: True if readable, False otherwise + :rtype: bool + """ + return os.path.exists(path) and os.access(path, os.R_OK) + + +def check_writable(path): + """Check if a path is writable. For files, checks the file. For + non-existent paths, checks the parent directory. + + :param str path: Path to check + :return: True if writable, False otherwise + :rtype: bool + """ + if os.path.exists(path): + return os.access(path, os.W_OK) + parent = os.path.dirname(path) or "." + return os.path.exists(parent) and os.access(parent, os.W_OK) + + +def check_disk_space_mb(path, required_mb=10): + """Check if sufficient disk space is available at the given path. + + :param str path: Path to check (file or directory) + :param int required_mb: Minimum required free space in MB + :return: True if sufficient space available, False otherwise + :rtype: bool + """ + try: + check_path = path if os.path.isdir(path) else os.path.dirname(path) or "." + usage = shutil.disk_usage(check_path) + free_mb = usage.free / (1024 * 1024) + return free_mb >= required_mb + except OSError: + return False Index: utils/globals.py =================================================================== diff -u --- utils/globals.py (revision 0) +++ utils/globals.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,169 @@ +"""Object holding all application global constants""" +import os + +# PATHS +PATH_HOME = os.getcwd() + '/' +PATH_CLOUDSYNC = PATH_HOME + +# Configuration groups +CONFIG_DEVICE = 'device' +CONFIG_DEVICE_IP = 'ip' +CONFIG_DEVICE_PORT = 'port' +CONFIG_DEVICE_NAME = 'name' +CONFIG_DEVICE_HD_SERIAL = 'hd_serial' +CONFIG_DEVICE_DG_SERIAL = 'dg_serial' +CONFIG_DEVICE_SW_VERSION = 'sw_version' +CONFIG_DEVICE_MODE = 'mode' +CONFIG_DEVICE_STATE = 'device_state' + +CONFIG_KEBORMED = "kebormed_paas" +CONFIG_KEBORMED_MFT_URL = "url_mft" +CONFIG_KEBORMED_DCS_URL = "url_dcs" +CONFIG_KEBORMED_IDP_CLIENT_SECRET = "idp_client_secret" +CONFIG_KEBORMED_DEVICE_IDENTITY_URL = "url_device_identity" +CONFIG_KEBORMED_REACHABILITY_URL = "url_reachability" +CONFIG_KEBORMED_DIA_ORG_ID = "dia_org_id" + +CONFIG_LOGS = "logs" +CONFIG_LOGS_DEFAULT_LOG_LEVEL = "default_log_level" +CONFIG_LOGS_DEFAULT_LOG_LEVEL_DURATION = "default_log_level_duration" +CONFIG_LOGS_CURRENT_LOG_LEVEL = "current_log_level" +CONFIG_LOGS_LOG_LEVEL_DURATION = "log_level_duration" +CONFIG_LOGS_START_TIMESTAMP = "log_level_start_timestamp" +CONFIG_LOGS_STOP_TIMESTAMP = "log_level_stop_timestamp" +CONFIG_LOGS_UPDATE_DCS_FLAG = "update_dcs_flag" + +# DEFAULTS +DEFAULT_REACHABILITY_URL = "https://google.com" + +# CONSOLE OUT +SETUP_CONSOLE_LINE="--------------------------------------------------------------------------------" + +# EXECUTION MODE +EXEC_MODE_UPGRADE = 'upgrade' +EXEC_MODE_UPGRADE_KEY = EXEC_MODE_UPGRADE +EXEC_MODE_UPDATE = 'update' +EXEC_MODE_UPDATE_KEY = EXEC_MODE_UPDATE +EXEC_MODE_NORMAL = 'normal' +EXEC_MODE_NORMAL_KEY = EXEC_MODE_NORMAL +EXEC_MODE = EXEC_MODE_NORMAL + +# CONFIG +CONFIG_PATH = os.path.join(PATH_HOME, "cloudsync/config/config.json") + +DECOMMISSION_CS_PATH = "/var/configurations/CloudSync/" +DECOMMISSION_FOLDERS = ['config', 'jwt', 'credentials'] + +OPERATION_CONFIG_PATH = "/var/configurations/CloudSync/config/" +OPERATION_CONFIG_FILE_PATH = os.path.join(OPERATION_CONFIG_PATH, "config.json") + +# LOGS +CS_LOG_PATH = "/media/sd-card/cloudsync/log" +CS_LOG_FILE = os.path.join(CS_LOG_PATH, "cloudsync.log") + +# DEVICE TOKEN +TOKEN_CACHING_PATH = "/var/configurations/CloudSync/jwt/" +DEVICE_KEBORMED_ACCESS_TOKEN_PATH = os.path.join(TOKEN_CACHING_PATH, "access_token.json") + +DEVICE_TOKEN_VALIDATION = "/api/device/validate" + +# CREDENTIALS +CREDENTIALS_PATH = "/var/configurations/CloudSync/credentials/" + +CERTIFICATE_X509_FILE_NAME = "client_certificate.pem" +PRIVATE_KEY_FILE_NAME = "client_private_key.pem" +PUBLIC_KEY_FILE_NAME = "client_public_key.pem" + +CREDENTIALS_CERTIFICATE_X509 = os.path.join(CREDENTIALS_PATH, CERTIFICATE_X509_FILE_NAME) +CREDENTIALS_PRIVATE_KEY = os.path.join(CREDENTIALS_PATH, PRIVATE_KEY_FILE_NAME) +CREDENTIALS_PUBLIC_KEY = os.path.join(CREDENTIALS_PATH, PUBLIC_KEY_FILE_NAME) + +# UI2CS VALUES +UI2CS_FILE_CHANNELS_PATH = "/media/sd-card/cloudsync" +UI2CS_FILE_LOG_PATH = "/media/sd-card/cloudsync/log" + + +# TREATMENT REPORT SECTIONS +TITLE = "[Title]" +TREATMENT_PRESCRIPTION = "[Treatment Prescription]" +TREATMENT_PARAMETERS = "[Treatment Parameters]" +POST_TREATMENT_DATA = "[Post-Treatment Data]" +EXTRA = "[Extra]" +TREATMENT_DATA = "[Treatment Data]" +TREATMENT_ALARMS = "[Treatment Alarms]" +TREATMENT_EVENTS = "[Treatment Events]" + +# TREATMENT SECTION CHARACTERS +SECTION_START_CHARACTER = "[" +SECTION_STOP_CHARACTER = "]" + +# UI APP - TREATMENT REPORT VALUES +TREATMENT_CODE = "Tx Code" +PATIENT_ID = "Patient ID" +TREATMENT_DURATION = "Treatment Duration" +BLOOD_FLOW_RATE = "Blood Flow Rate" +DIALYSATE_FLOW_RATE = "Dialysate Flow Rate" +ACID_CONCENTRATE_TYPE = "Acid ConcentrateType" +BICARBONATE_CONCENTRATE_TYPE = "Bicarbonate Concentrate Type" +POTASSIUM_CONCENTRATION = "Potassium Concentration" +CALCIUM_CONCENTRATION = "Calcium Concentration" +BICARBONATE_CONCENTRATION = "Bicarbonate Concentration" +SODIUM_CONCENTRATION = "Sodium Concentration" +DIALYSATE_TEMPERATURE = "Dialysate Temperature" +DIALYZER_TYPE = "Dialyzer Type" +HEPARIN_TYPE = "Heparin Type" +HEPARIN_CONCENTRATION = "Heparin Concentration" +HEPARIN_BOLUS_VOLUME = "Heparin Bolus Volume" +HEPARIN_DISPENSE_RATE = "Heparin Dispense Rate" +HEPARIN_STOP = "Heparin Stop" +TREATMENT_START_DATE_TIME = "Treatment Start DateTime" +TREATMENT_END_DATE_TIME = "Treatment End DateTime" +ACTUAL_TREATMENT_DURATION = "Actual Treatment Duration" +DIALYSATE_VOLUME_USED = "Dialysate Volume Used" +PRESCRIBED_UF_VOLUME = "Prescribed UF Volume" +TARGET_UF_VOLUME = "Target UF Volume" +ACTUAL_UF_VOLUME = "Actual UF Volume" +PRESCRIBED_UF_RATE = "Prescribed UF Rate" +TARGET_UF_RATE = "Target UF Rate" +ACTUAL_UF_RATE = "Actual UF Rate" +SALINE_BOLUS_VOLUME = "Saline Bolus Volume" +HEPARIN_DELIVERED_VOLUME = "Heparin Delivered Volume" +WATER_SAMPLE_TEST_RESULT = "Water Sample Test Result" +# new fields +TARGET_WEIGHT = " Target Weight" +TOTAL_CHLORINE = "Total Chlorine" +PH = "PH" +CONDUCTIVITY = "Conductivity" +MACHINE_WIPED_DOWN = "Machine Wiped Down" +FILTER_LIFE = "Filter Life" +LAST_CHEMICAL_DISINFECTION = "Last Chemical Disinfection" +LAST_HEAT_DISINFECTION = "Last Heat Disinfection" + +# TREATMENT TEMPLATE PATH +TREATMENT_REPORT_TEMPLATE_PATH = "cloudsync/config/treatment_report_template.json" + +# LOGS UPLOAD TEMPLATE PATH +LOG_UPLOAD_TEMPLATE_PATH = "cloudsync/config/log_upload_template.json" + +# USER_AGENT +USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" +CONTENT_TYPE = "application/json" +API_VERSION = "2.0" +BEARER_HOLDER = "Bearer {0}" +TOO_MANY_REDIRECTS_HOLDER = "{0},2,{1},Too many redirects" +GENERAL_EXCEPTION_HOLDER = "{0},2,{1},{2}" +REGISTRATION_TIMEOUT_HOLDER = "{0},2,{1},Registration timeout" +FILE_NOT_FOUND = "{0},2,{1},UI logfile not found" + +# HTTP CODES +OK = 200 +SUCCESS = 204 +BAD_REQUEST = 400 +UNAUTHORIZED = 401 +NOT_FOUND = 404 +CONFLICT = 409 +UNASSIGNED = 427 +INTERNAL_SERVER_ERROR = 500 + +# TIME CONSTANTS +S_MS_CONVERSION_FACTOR = 1000 Index: utils/heartbeat.py =================================================================== diff -u --- utils/heartbeat.py (revision 0) +++ utils/heartbeat.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,32 @@ +"""Implementation of heartbeat functionality""" + +from logging import Logger +from threading import Thread +from time import sleep + +from cloudsync.utils.helpers import * +from cloudsync.common.enums import * + +class HeartBeatProvider: + HEARTBEAT_FREQ = 20 + send_heartbeat = False + + def __init__(self, logger: Logger, network_request_handler, output_channel): + self.logger = logger + self.network_request_handler = network_request_handler + self.output_channel = output_channel + self.thread = Thread(target=self.heartbeat, daemon=True) + self.thread.start() + + def heartbeat(self): + """ + Sending heartbeat to UI: requesting device state + """ + while True: + if self.send_heartbeat: + # requesting device state from UI + helpers_add_to_output_channel(output_channel=self.output_channel, + message_body=str(OutboundMessageIDs.CS2UI_REQ_DEVICE_STATE.value) + ',0', + success_message="CS2UI_REQ_DEVICE_STATE message added to output channel") + + sleep(self.HEARTBEAT_FREQ) Index: utils/helpers.py =================================================================== diff -u --- utils/helpers.py (revision 0) +++ utils/helpers.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,824 @@ +"""Implementation of helper methods""" + +import os +import shutil +import json +import hashlib +import socket +import re +import base64 +import uuid +import subprocess +import math + +from datetime import * +from time import time, sleep +from typing import Union, Any +from logging import Logger + +from cloudsync.utils.singleton import SingletonMeta +from cloudsync.common.enums import * +from cloudsync.handlers.network_request import NetworkRequest +from cloudsync.utils.reachability import * +from cloudsync.utils.globals import * + + +class GUtils(metaclass=SingletonMeta): + + def __init__(self): + self.logger = None + self.reachability_provider = None + + def add_logger(self, logger: Logger): + self.logger = logger + + def add_reachability_provider(self, reachability_provider: ReachabilityProvider): + self.reachability_provider = reachability_provider + + +g_utils = GUtils() + +CRC_LIST = [ + 0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46, + 67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109, + 134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168, + 197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235, + 61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19, + 126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80, + 187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149, + 248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214, + 122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84, + 57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23, + 252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210, + 191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145, + 71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105, + 4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42, + 193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239, + 130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172 +] + + +def helpers_is_int(val: str) -> bool: + """ + Determines if the value can be converted to an int + + :param val: the value in string form + :return: True if can be converted to an int, false otherwise + """ + + try: + int(val) + return True + except ValueError: + return False + + +def helpers_is_float(val: str) -> bool: + """ + Determines if the value can be converted to a float + + :param val: the value in string form + :return: True if can be converted to a float, false otherwise + """ + try: + float(val) + except (ValueError, TypeError): + return False + return True + + +def helpers_try_numeric(val: str) -> Union[int, float, str]: + """ + Tries to convert the value to numeric. If it's not possible + leaves it as a string + + :param val: the value to convert + :return: the converted value if possible, otherwise it is returned as is + """ + + if helpers_is_int(val): + return int(val) + elif helpers_is_float(val): + try: + f = float(val) + except Exception: + g_utils.logger.warning(f"Unexpected conversion failure for: '{val}' — replacing with 0") + return 0 + if math.isinf(f) or math.isnan(f): + g_utils.logger.warning(f"Non-finite float value encountered: '{val}' — replacing with 0") + return 0 + return f + else: + return val + + +def helpers_parse_treatment_log(path: str) -> dict: + """ + Converts a treatment.log file to a python dictionary + The treatment log needs to be formatted better. + Until then, this is a (non-ideal) way to read it. + + :param path: the path to the treatment log file + :return: the parsed treatment log file + """ + if not os.path.exists(path): + g_utils.logger.warning("Path does not exist: %s", path) + return {} + + result = {} + with open(path, 'r') as f: + lines = f.readlines() + group = "NoGroup" + for line_ in lines: + line = line_.replace("\n", "") + if "[" in line and "]" in line: + group = line.split("[")[1].split("]")[0] + if group not in result: + result[group] = {} + else: + if group in ["Title", "Treatment Parameters", "Post-Treatment Data", "Extra"]: + tokens = line.split(",") + if len(tokens) > 1: + subgroup = tokens[0] + result[group][subgroup] = {} + result[group][subgroup]["value"] = helpers_try_numeric(tokens[1]) + if len(tokens) > 2: + if tokens[2] == "": + result[group][subgroup]["units"] = None + else: + result[group][subgroup]["units"] = tokens[2] + elif group in ["Treatment Data", "Treatment Alarms", "Treatment Events"]: + tokens = line.split(",") + tokens_converted = [] + for token in tokens: + tokens_converted.append(helpers_try_numeric(token)) + result[group]["data"] = result[group].get("data", []) + [tokens_converted] + + return result + + +def helpers_device_state_to_cloud_state(hd_mode: HDOpModes, hd_sub_mode: HDOpSubModes) -> DeviceStates: + """ + Inactive Not OK - N/A - HD f/w will not know active vs. inactive - UI or cloud will have to maintain assigned tenant, active/inactive and Ok/Not OK while inactive + Inactive OK - N/A + Active OK - N/A + + Active Ready - mode is 3 (standby) and sub-mode is 1 (wait for treatment) + Active In Treatment - mode between 4 and 7 (treatment params .. post treatment) + Active Not Ready - mode/sub-mode is anything other than ready, in-treatment, or not OK + Active Not OK - mode is 0 (fault) + + Decommissioned - N/A - HD f/w will not know if system is decommissioned + """ + if hd_mode == HDOpModes.MODE_STAN: + return DeviceStates.ACTIVE_READY + if (hd_mode == HDOpModes.MODE_TPAR) or (hd_mode == HDOpModes.MODE_PRET) or (hd_mode == HDOpModes.MODE_TREA) or ( + hd_mode == HDOpModes.MODE_POST): + return DeviceStates.ACTIVE_IN_TREATMENT + if hd_mode == HDOpModes.MODE_FAUL: + return DeviceStates.ACTIVE_NOT_OK + + return DeviceStates.ACTIVE_NOT_READY + + +def helpers_get_stored_token() -> Union[str, None]: + """ + Returns the stored token + :return: The token if found, otherwise returns None + """ + data = None + if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): + return None + with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: + try: + data = json.load(f) + except json.decoder.JSONDecodeError: + return None + + if data is None: + return None + + return data.get("access_token", None) + + +def helpers_read_config(path: str) -> dict: + """ + Read the configuration + :param path: the path to the configuration + :return: the loaded configuration + """ + if os.path.exists(path): + with open(path, 'r') as f: + config = json.load(f) + return config + else: + g_utils.logger.error("Operation configuration file does not exist: {0}".format(path)) + raise FileNotFoundError(f"Operation configuration file does not exist: {path}") + + +def helpers_write_config(folder_path: str, file_path: str, config: dict) -> None: + """ + Writes the config to the provided path + If folder_path is provided, it first checks if the folder exists and creates it if it doesn't + + :param folder_path: the path for the config folder + :param file_path: the path where the config json will be written + :param config: the config dictionary + :return: None + """ + if folder_path is not None: + if not os.path.exists(folder_path): + os.makedirs(folder_path) + + with open(file_path, 'w') as f: + json.dump(config, f, indent=4) + + +def helpers_read_access_token(path: str) -> dict: + """ + Reads the access token json file, returns it as a dict + :param path: The path to the access token json file + :return: + """ + data = {} + if os.path.exists(path): + with open(path, 'r', encoding="utf-8-sig") as f: + data = json.load(f) + return data + + +def helpers_read_treatment_report_template(path: str) -> dict: + """ + Read the treatment report template + :param path: the path to the template + :return: the loaded template + """ + if os.path.exists(path): + with open(path, 'r') as f: + template = json.load(f) + return template + else: + g_utils.logger.error( + "Configuration file does not exist: {0}".format(path)) + return {} + + +def log_func(func): + """ + Log the function and the parameters passed to it + @param func: The decorated function + @return: The wrapper function + """ + + def _wrapper(*args, **kwargs): + g_utils.logger.debug("Calling {0} args: {1} kwargs: {2}".format(func.__name__, + tuple(args), + kwargs)) + return func(*args, **kwargs) + + return _wrapper + + +def helpers_file_to_byte_array(file_path: str) -> Union[str, None]: + try: + with open(file_path, "rb") as f: + # Read the entire file in binary mode + file_bytes = f.read() + return base64.b64encode(file_bytes).decode('utf-8') + except FileNotFoundError as e: + g_utils.logger.error(f"Device log file not found: {e}") + return None + except Exception as e: + g_utils.logger.error(f"Error reading device log file: {e}") + return None + + +def helpers_get_file_size(file_path: str) -> Union[int, None]: + try: + return os.path.getsize(file_path) + except OSError as e: + g_utils.logger.error(f'Error getting file size: {e}') + return None + + +def helpers_read_treatment_log_file(path: str): + treatment_data = helpers_read_treatment_report_template(TREATMENT_REPORT_TEMPLATE_PATH) + + try: + f = open(path) + + counter = 0 + treatment_log_lines = f.readlines() + + while counter < len(treatment_log_lines): + line = treatment_log_lines[counter].strip() + + if line.startswith(SECTION_START_CHARACTER) and line.endswith(SECTION_STOP_CHARACTER) and counter < ( + len(treatment_log_lines) - 2): + section = line + section_lines = [] + counter += 1 + section_start_counter = counter + line = treatment_log_lines[counter].strip() + while not (line.startswith(SECTION_START_CHARACTER) and line.endswith( + SECTION_STOP_CHARACTER)) and counter < len(treatment_log_lines) - 1: + section_lines.append(line) + counter += 1 + line = treatment_log_lines[counter].strip() + if len(section_lines) > 0: + if section == TREATMENT_DATA: + for data_line in section_lines: + data_components = data_line.split(',') + data_record = { + "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, + "bloodFlowRate": helpers_try_numeric(data_components[1]), + "dialysateFlowRate": helpers_try_numeric(data_components[2]), + "ultrafiltrationRate": helpers_try_numeric(data_components[3]), + "arterialPressure": helpers_try_numeric(data_components[4]), + "venousPressure": helpers_try_numeric(data_components[5]), + "systolic": helpers_try_numeric(data_components[6]), + "diastolic": helpers_try_numeric(data_components[7]), + "heartRate": helpers_try_numeric(data_components[8]) + } + treatment_data['data']['treatment']['data'].append(data_record) + elif section == TREATMENT_ALARMS: + for data_line in section_lines: + data_components = data_line.split(',') + parameters = [] + if len(data_components) > 2: + parameters = data_components[2:len(data_components)] + data_record = { + "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, + "title": data_components[1], + "parameters": parameters + } + treatment_data['data']['treatment']['alarms'].append(data_record) + elif section == TREATMENT_EVENTS: + for data_line in section_lines: + data_components = data_line.split(',') + parameters = [] + if len(data_components) > 2: + parameters = data_components[2:len(data_components)] + data_record = { + "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, + "title": data_components[1], + "parameters": parameters + } + treatment_data['data']['treatment']['events'].append(data_record) + else: + counter = section_start_counter + + else: + if line.startswith(TREATMENT_CODE): + treatment_data['data']['treatment']['treatmentCode'] = line.split(',')[1] + treatment_data['reference'] = line.split(',')[1] + elif line.startswith(PATIENT_ID): + treatment_data['data']['patient']['id'] = line.split(',')[1] + elif line.startswith(TREATMENT_DURATION): + treatment_data['data']['treatment']['parameters']['treatmentDuration'] = int(line.split(',')[1]) + elif line.startswith(BLOOD_FLOW_RATE): + treatment_data['data']['treatment']['parameters']['bloodFlowRate'] = int(line.split(',')[1]) + elif line.startswith(DIALYSATE_FLOW_RATE): + treatment_data['data']['treatment']['parameters']['dialysateFlowRate'] = int(line.split(',')[1]) + elif line.startswith(ACID_CONCENTRATE_TYPE): + treatment_data['data']['treatment']['parameters']['dialysate']['acidCode'] = line.split(',')[1] + elif line.startswith(BICARBONATE_CONCENTRATE_TYPE): + treatment_data['data']['treatment']['parameters']['dialysate']['bicarbCode'] = line.split(',')[1] + elif line.startswith(POTASSIUM_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['dialysate']['K'] = float(line.split(',')[1]) + elif line.startswith(CALCIUM_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['dialysate']['Ca'] = float(line.split(',')[1]) + elif line.startswith(BICARBONATE_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['dialysate']['HCO3'] = float(line.split(',')[1]) + elif line.startswith(SODIUM_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['dialysate']['Na'] = float(line.split(',')[1]) + elif line.startswith(DIALYSATE_TEMPERATURE): + treatment_data['data']['treatment']['parameters']['dialysateTemp'] = float(line.split(',')[1]) + elif line.startswith(DIALYZER_TYPE): + treatment_data['data']['treatment']['parameters']['dialyzerModel'] = line.split(',')[1] + elif line.startswith(HEPARIN_TYPE): + treatment_data['data']['treatment']['parameters']['heparinType'] = line.split(',')[1] + elif line.startswith(HEPARIN_CONCENTRATION): + treatment_data['data']['treatment']['parameters']['heparinConcentration'] = line.split(',')[1] + elif line.startswith(HEPARIN_BOLUS_VOLUME): + treatment_data['data']['treatment']['parameters']['heparinBolus'] = line.split(',')[1] + elif line.startswith(HEPARIN_DISPENSE_RATE): + treatment_data['data']['treatment']['parameters']['heparinRate'] = line.split(',')[1] + elif line.startswith(HEPARIN_STOP): + treatment_data['data']['treatment']['parameters']['heparinStopBeforeTreatmentEnd'] = line.split(',')[1] + elif line.startswith(TREATMENT_START_DATE_TIME): + element = datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") + timestamp = datetime.timestamp(element) + treatment_data['data']['treatment']['time']['start'] = int(timestamp) * S_MS_CONVERSION_FACTOR + elif line.startswith(TREATMENT_END_DATE_TIME): + element = datetime.strptime(line.split(',')[1], "%Y/%m/%d %H:%M") + timestamp = datetime.timestamp(element) + treatment_data['data']['treatment']['time']['end'] = int(timestamp) * S_MS_CONVERSION_FACTOR + elif line.startswith(ACTUAL_TREATMENT_DURATION): + treatment_data['data']['treatment']['time']['treatmentDuration'] = int(line.split(',')[1]) + elif line.startswith(DIALYSATE_VOLUME_USED): + treatment_data['data']['deviceTreatmentData']['dialysateVolumeUsed'] = float(line.split(',')[1]) + elif line.startswith(PRESCRIBED_UF_VOLUME): + treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationVolume'] = float( + line.split(',')[1]) + elif line.startswith(TARGET_UF_VOLUME): + treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationVolume'] = float( + line.split(',')[1]) + elif line.startswith(ACTUAL_UF_VOLUME): + treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationVolume'] = float( + line.split(',')[1]) + elif line.startswith(PRESCRIBED_UF_RATE): + treatment_data['data']['deviceTreatmentData']['prescribedUltrafiltrationRate'] = float( + line.split(',')[1]) + elif line.startswith(TARGET_UF_RATE): + treatment_data['data']['deviceTreatmentData']['finalTargetUltrafiltrationRate'] = float( + line.split(',')[1]) + elif line.startswith(ACTUAL_UF_RATE): + treatment_data['data']['deviceTreatmentData']['actualUltrafiltrationRate'] = float( + line.split(',')[1]) + elif line.startswith(SALINE_BOLUS_VOLUME): + treatment_data['data']['deviceTreatmentData']['salineBolusVolumeGiven'] = float(line.split(',')[1]) + elif line.startswith(HEPARIN_DELIVERED_VOLUME): + treatment_data['data']['deviceTreatmentData']['heparinDelivered'] = line.split(',')[1] + elif line.startswith(WATER_SAMPLE_TEST_RESULT): + treatment_data['data']['treatment']['treatmentPreparation']['totalChlorine'] = line.split(',')[1] + # new treatment fields + elif line.startswith(TARGET_WEIGHT): + treatment_data['data']['treatment']['time']['targetWeight'] = "" + elif line.startswith(TOTAL_CHLORINE): + treatment_data['data']['treatment']['treatmentPreparation']['totalChlorine'] = "" + elif line.startswith(PH): + treatment_data['data']['treatment']['treatmentPreparation']['ph'] = "" + elif line.startswith(CONDUCTIVITY): + treatment_data['data']['treatment']['treatmentPreparation']['conductivity'] = "" + elif line.startswith(MACHINE_WIPED_DOWN): + treatment_data['data']['diagnostics']['machineWipedDown'] = "" + elif line.startswith(FILTER_LIFE): + treatment_data['data']['diagnostics']['filterLife'] = "" + elif line.startswith(LAST_CHEMICAL_DISINFECTION): + treatment_data['data']['diagnostics']['lastChemicalDisinfection'] = "" + elif line.startswith(LAST_HEAT_DISINFECTION): + treatment_data['data']['diagnostics']['lastHeatDisinfection'] = "" + counter += 1 + + return treatment_data + except IOError as er: + g_utils.logger.error('Opening treatment log file error: {0}'.format(' '.join(er.args))) + return None + except Exception as e: + g_utils.logger.error('Error parsing treatment file: {0}'.format(' '.join(e.args))) + return None + + +def helpers_add_to_network_queue(network_request_handler, request_type, url, payload, method, g_config, + success_message): + cycle_duration = REACHABILITY_CYCLE_PAUSE + total_cycles = REACHABILITY_CYCLES + + cycle = 0 + + while (not g_utils.reachability_provider.reachability) and (cycle < total_cycles): + sleep(cycle_duration) + cycle += 1 + + if g_utils.reachability_provider.reachability: + r = NetworkRequest(request_type=request_type, + url=url, + payload=payload, + method=method, + g_config=g_config) + request_added_to_queue = False + max_attempts = 300 + attempt = 0 + while not request_added_to_queue and attempt < max_attempts: + request_added_to_queue = network_request_handler.enqueue_request(r) + if not request_added_to_queue: + sleep(0.1) + attempt += 1 + + if request_added_to_queue: + g_utils.logger.info(success_message) + else: + g_utils.logger.error("Failed to enqueue network request after {0} attempts: {1}".format(max_attempts, request_type.name)) + else: + g_utils.logger.warning("Internet DOWN: Network request {0} couldn't be processed".format(request_type.name)) + + +def helpers_add_to_output_channel(output_channel, message_body, success_message): + message_added_to_queue = False + max_attempts = 300 + attempt = 0 + while not message_added_to_queue and attempt < max_attempts: + message_added_to_queue = output_channel.enqueue_message(message_body) + if not message_added_to_queue: + sleep(0.1) + attempt += 1 + + if message_added_to_queue: + g_utils.logger.info(success_message) + else: + g_utils.logger.error("Failed to enqueue output message after {0} attempts".format(max_attempts)) + + +def helpers_sha256_checksum(data: str) -> str: + """ + Returns the calculated checksum (SHA256) for input data + @param data: input data. It can be either string or file + @return:: checksum + """ + + isFile = os.path.isfile(data) + + if isFile: + checksum = hashlib.sha256() + with open(data, "rb") as f: + # Read and update hash in blocks of 4K + for byte_block in iter(lambda: f.read(4096), b""): + checksum.update(byte_block) + else: + checksum = hashlib.sha256(data.encode()) + + return checksum.hexdigest() + + +def helpers_crc8(message_list): + """ + Returns the calculated crc from a message list + @param message_list: is a list of integer numbers containing the message + @return:: integer containing an unsigned byte + """ + crc = 0 + for byte in message_list: + unsigned_byte = byte ^ crc + crc = CRC_LIST[unsigned_byte] + return crc + + +def helpers_get_ip_address(): + hostname = socket.gethostname() + ip_address = socket.gethostbyname(hostname) + return ip_address + + +def helpers_decommission_device(): + parent_folder = DECOMMISSION_CS_PATH + subfolders_to_delete = DECOMMISSION_FOLDERS + + for subfolder in subfolders_to_delete: + path_to_delete = os.path.join(parent_folder, subfolder) + if os.path.exists(path_to_delete) and os.path.isdir(path_to_delete): + shutil.rmtree(path_to_delete) + else: + raise FileNotFoundError( + f"{path_to_delete} not found or not a directory!") + +# LOGGING SPECIFIC HELPERS + + +def helpers_read_device_log_template(path: str) -> dict: + """ + Read the device log template + :param path: the path to the template + :return: the loaded template + """ + if os.path.exists(path): + with open(path, 'r') as f: + template = json.load(f) + return template + else: + g_utils.logger.error( + "Configuration file does not exist: {0}".format(path)) + return {} + + +def helpers_read_cs_log_template(path: str) -> dict: + """ + Read the cs log template + :param path: the path to the template + :return: the loaded template + """ + if os.path.exists(path): + with open(path, 'r') as f: + template = json.load(f) + return template + else: + g_utils.logger.error( + "Configuration file does not exist: {0}".format(path)) + return {} + + +def helpers_extract_device_log_metadata(log_file_name:str) -> Union[dict,None]: + local_date_pattern = r"^\d{8}(?=_)" + local_time_pattern = r"^(?:^.*?)_(\d{6})_" + serial_number_pattern = r"^[a-zA-Z0-9]+_[a-zA-Z0-9]+_([a-zA-Z0-9]+)(?=_)" + device_subtype_pattern = r"^[a-zA-Z0-9]+_[a-zA-Z0-9]+_[a-zA-Z0-9]+_([a-zA-Z0-9]+)(?=)" + log_type_pattern = r"[a-zA-Z0-9]+\.u\.(\w+)(?=)" + + local_date_match = re.search(local_date_pattern, log_file_name) + local_time_match = re.search(local_time_pattern, log_file_name) + serial_number_match = re.search(serial_number_pattern, log_file_name) + device_subtype_match = re.search(device_subtype_pattern, log_file_name) + log_type_match = re.search(log_type_pattern, log_file_name) + + return { + "local_date": local_date_match.group(0) if local_date_match else 'unknown', + "local_time": local_time_match.group(1) if local_time_match else 'unknown', + "serial_number": serial_number_match.group(1) if serial_number_match else 'unknown', + "device_sub_type": device_subtype_match.group(1) if device_subtype_match else 'unknown', + "device_log_type": log_type_match.group(1) if log_type_match else 'unknown' + } + + +def helpers_construct_cs_log_json(cs_log_data: dict): + """ + Constructs the payload for cs log file uploading + :param path: the path to the log file + :returns: the json payload to be uploaded + """ + cs_log_json = helpers_read_cs_log_template(LOG_UPLOAD_TEMPLATE_PATH) + + # Convert the file into byte array + logs_byte_array = helpers_file_to_byte_array(cs_log_data['path']) + checksum = helpers_sha256_checksum(logs_byte_array) + + # Get file size + file_size = helpers_get_file_size(cs_log_data['path']) + + # Get the filename + file_name = os.path.basename(cs_log_data['path']) + + # Completion and generation timestamp from CS (in miliseconds) + local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) + + # Start and End timestamps calculation + # Parse the date from the rotated log filename (suffix format: MM-DD-YYYY) + # to ensure timestamps match the actual log date, not the upload date. + # This prevents mismatched timestamps when uploading backlogged files. + log_date = None + parts = file_name.rsplit('.', 1) + if len(parts) == 2: + try: + log_date = datetime.strptime(parts[1], "%m-%d-%Y").replace(tzinfo=timezone.utc) + except ValueError: + pass + if log_date is None: + log_date = datetime.now(timezone.utc) + start_of_day = int(log_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()*1000) + end_of_day = int(log_date.replace(hour=23, minute=59, second=59, microsecond=999999).timestamp()*1000) + + # Populate JSON object + cs_log_json['start_session']['reference'] = str(uuid.uuid4()) + cs_log_json['start_session']['generatedAt'] = local_timestamp + cs_log_json['start_session']['dataType'] = 'device-data' + cs_log_json['start_session']['serialNumber'] = cs_log_data['serialNumber'] + cs_log_json['start_session']['macAddress'] = 'device-data' + cs_log_json['start_session']['organizationId'] = cs_log_data['organizationId'] + cs_log_json['start_session']['metadata']['dataType'] = 'cloud-sync-log-category' + cs_log_json['start_session']['metadata']['deviceFileName'] = file_name + cs_log_json['start_session']['metadata']['startTimestamp'] = start_of_day + cs_log_json['start_session']['metadata']['endTimestamp'] = end_of_day + cs_log_json['end_session']['checksum'] = checksum + cs_log_json['general']['file_size'] = file_size + cs_log_json['general']['file_path'] = cs_log_data['path'] + + return cs_log_json + + +def helpers_construct_device_log_json(device_log_data: dict): + """ + Constructs the payload for device log file uploading + :param device_log_data: a dictionary with the info related to the current file + :returns: a json payload to be used for uploading using sessions. + """ + device_log_json = helpers_read_device_log_template(LOG_UPLOAD_TEMPLATE_PATH) + + # Convert the file into byte array + logs_byte_array = helpers_file_to_byte_array(device_log_data['path']) + checksum = helpers_sha256_checksum(logs_byte_array) + + # Get file size + file_size = helpers_get_file_size(device_log_data['path']) + + # Extract metadata from the filename + file_name = os.path.basename(device_log_data['path']) + extracted_metadata = helpers_extract_device_log_metadata(file_name) + + # Timestamp for GeneratedAt (in miliseconds) + local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) + + # Calculate the UTC timestamp based on the local date and time the UI provided (in milliseconds) (NEED BETTER SOLUTION HERE) + if extracted_metadata['local_date'] != 'unknown' and extracted_metadata['local_time'] != 'unknown': + datetime_obj = datetime.strptime(f"{extracted_metadata['local_date']}{extracted_metadata['local_time']}", "%Y%m%d%H%M%S") + # Convert to UTC timestamp + ui_utc_timestamp = int(datetime_obj.timestamp()*1000) + else: + ui_utc_timestamp = local_timestamp + + # Populate JSON object + if extracted_metadata is not None: + device_log_json['start_session']['reference'] = str(uuid.uuid4()) + device_log_json['start_session']['generatedAt'] = local_timestamp + device_log_json['start_session']['dataType'] = 'device-data' + device_log_json['start_session']['serialNumber'] = device_log_data['serialNumber'] + device_log_json['start_session']['macAddress'] = 'device-data' + device_log_json['start_session']['organizationId'] = device_log_data['organizationId'] + device_log_json['start_session']['metadata']['dataType'] = 'device-log-category' + device_log_json['start_session']['metadata']['deviceLogType'] = extracted_metadata['device_log_type'] + device_log_json['start_session']['metadata']['deviceSubType'] = extracted_metadata['device_sub_type'] + device_log_json['start_session']['metadata']['deviceFileName'] = file_name + device_log_json['start_session']['metadata']['startTimestamp'] = ui_utc_timestamp + device_log_json['start_session']['metadata']['endTimestamp'] = ui_utc_timestamp + device_log_json['end_session']['checksum'] = checksum + device_log_json['general']['file_size'] = file_size + device_log_json['general']['file_path'] = device_log_data['path'] + + return device_log_json + + else: + g_utils.logger.error('Device log file name does not match the pattern') + return None + + +def helpers_should_update_dcs_log_level(g_config: dict) -> bool: + """ + Returns True if the state of the log level should be + communicated to the DCS else False. It is controlled by + the update_dcs_flag in the config.json. + * 1 --> True + * 0 --> False + """ + update_dcs_flag = g_config[CONFIG_LOGS][CONFIG_LOGS_UPDATE_DCS_FLAG] + if int(update_dcs_flag) == 1: + return True + return False + + +def helpers_should_update_cs_log_level(response: dict, g_config: dict) -> bool: + current_log_level:str = g_config[CONFIG_LOGS][CONFIG_LOGS_CURRENT_LOG_LEVEL].upper() + requested_log_level:str = response['logLevel'].upper() + if requested_log_level != current_log_level: + g_utils.logger.debug(f"DCS requested to change the log level from {current_log_level} to {requested_log_level}") + return True + return False + + +def helpers_calculate_log_level_duration(response): + """ + Returns a tuple with duration_in_seconds, start_timestamp, end_timestamp + if a valid log level duration was given. Otherwise None + """ + duration = int(response['logLevelDuration']) if response['logLevelDuration'] else 0 + start_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000) + end_timestamp = start_timestamp + duration + + if duration > 0: + duration_in_seconds = int(duration/1000) + return duration_in_seconds, start_timestamp, end_timestamp + return None + + +def helpers_log_retention(retention_pct): + cs_log_pct = retention_pct / 100 + num_files_deleted = 0 + total_deleted_size = 0 + # Total sd stats — use the log path's filesystem (same as channels on device, + # but may be a separate tmpfs in the mock stack for realistic simulation) + sd_total_bytes, sd_used_bytes, sd_free_bytes = shutil.disk_usage(UI2CS_FILE_LOG_PATH) + + if sd_total_bytes == 0: + sd_total_bytes = 1 + # Cloudsync log stats + used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_LOG_PATH]).split() + if len(used_bytes_proc_call) > 0: + cs_used_bytes = int(used_bytes_proc_call[0].decode('utf-8')) + + total_deleted_size = cs_used_bytes + # Retrieve file list + f = [] + for subdir, dir, files in os.walk(UI2CS_FILE_LOG_PATH): + for file in files: + f.append(os.path.join(UI2CS_FILE_LOG_PATH, file)) + + # Sorted with oldest modified first. + f.sort(key=lambda x: os.path.getmtime(x)) + + if len(f) > 1: + while (cs_used_bytes / sd_total_bytes) >= cs_log_pct and len(f) > 1: + file_to_remove = f[0] + os.remove(file_to_remove) + num_files_deleted += 1 + f.pop(0) + used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_LOG_PATH]).split() + if len(used_bytes_proc_call) > 0: + cs_used_bytes = int(used_bytes_proc_call[0].decode('utf-8')) + else: + g_utils.logger.error("Error in disk usage call.") + break + total_deleted_size = total_deleted_size - cs_used_bytes + else: + total_deleted_size = 0 + g_utils.logger.info("No files Removed.") + else: + g_utils.logger.error("Error in disk usage call.") + + return num_files_deleted, total_deleted_size Index: utils/logging.py =================================================================== diff -u --- utils/logging.py (revision 0) +++ utils/logging.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,250 @@ +import logging +import os +import threading +from cloudsync.handlers.logs_handler import CustomTimedRotatingFileHandlerHandler +from cloudsync.utils.singleton import SingletonMeta +from cloudsync.utils.globals import * +from cloudsync.utils.helpers import * +from cloudsync.common.enums import * + + +class LoggingConfig(metaclass=SingletonMeta): + """ + Encapsulates logging configuration and setup. + """ + + DEFAULT_LOG_LEVEL = 'ERROR' + LOG_LEVELS = { + 0: "NOTSET", + 10: "DEBUG", + 20: "INFO", + 30: "WARN", + 40: "ERROR", + 50: "CRITICAL" + } + _countdown_thread = None + _stop_countdown_thread_event = None + + def __init__(self): + self.flask_app = None + self.log_level = self.DEFAULT_LOG_LEVEL + self.log_level_name = None + self.log_level_duration = None + self.log_handler = None + self.g_config = None + + def initiate(self, app): + self.flask_app = app + + # Genereate the Logs Dir + if not os.path.exists(CS_LOG_PATH): + os.makedirs(CS_LOG_PATH) + + # Remove existing handlers form the Flask app + for handler in self.flask_app.logger.handlers: + self.flask_app.logger.removeHandler(handler) + + # Initiate the main configuration of the logger(s) + self.__configure_logging() + + def set_log_level(self, log_level) -> None: + try: + self.log_level = log_level if isinstance(log_level, int) else getattr(logging, log_level.upper()) + self.log_level_name = self.LOG_LEVELS[self.log_level] + + # Set the log level to the custom log handler + self.log_handler.setLevel(self.log_level) + # Set the log level the the flask logger + # self.flask_app.logger.setLevel(self.log_level) + # Set the log level ot the root logger + root_logger = logging.getLogger() + root_logger.setLevel(self.log_level) + + # Update the log state file + update_obj = { + 'attribute': CONFIG_LOGS_CURRENT_LOG_LEVEL, + 'value': self.log_level_name + } + self.__update_log_state(update_object=update_obj) + + g_utils.logger.info(f'Log level set at {self.log_level_name} ({self.log_level})') + + except Exception as e: + g_utils.logger.error(f'Could not update the log level: {e}') + + def set_log_handler(self): + # Add the handler to the Flask app logger + # self.flask_app.logger.addHandler(self.log_handler) + # Add the handler to the root logger + root_logger = logging.getLogger() + root_logger.addHandler(self.log_handler) + + def set_dcs_flag(self, flag_state:int): + update_obj = { + 'attribute': 'update_dcs_flag', + 'value': flag_state + } + self.__update_log_state(update_object=update_obj) + + def set_log_duration(self, duration: Union[int, tuple]): + + if isinstance(duration, tuple): + self.log_level_duration = int(duration[0]) + # Update the duration at log state file + update_obj = { + 'attribute': CONFIG_LOGS_LOG_LEVEL_DURATION, + 'value': duration[0] + } + self.__update_log_state(update_object=update_obj) + # Update the start timestamp at log state file + update_obj = { + 'attribute': CONFIG_LOGS_START_TIMESTAMP, + 'value': duration[1] + } + self.__update_log_state(update_object=update_obj) + # Update the stop timestamp at log state file + update_obj = { + 'attribute': CONFIG_LOGS_STOP_TIMESTAMP, + 'value': duration[2] + } + self.__update_log_state(update_object=update_obj) + else: + self.log_level_duration = int(duration) + # Update the duration at log state file + update_obj = { + 'attribute': CONFIG_LOGS_LOG_LEVEL_DURATION, + 'value': duration + } + self.__update_log_state(update_object=update_obj) + + g_utils.logger.debug(f"Log level duration set at {self.log_level_duration}") + + def get_log_level(self) -> tuple: + """ + Returns an object with both the `numeric_value` and `text_value` + of the log level. + """ + return { "numeric_value": self.log_level, "text_value": self.log_level_name } + + def set_network_provider(self, network_request_handler) -> None: + """ + Passes the network provider to the custom log handler so it can + be able to send uploading log requests to the dcs for CS logs. + """ + self.log_handler.set_network_provider(network_request_handler=network_request_handler) + + def set_error_provider(self, error_handler) -> None: + """ + Passes the error provider to the custom log handler so it can + be able to send uploading log requests to the dcs for CS logs. + """ + self.log_handler.set_error_provider(error_handler=error_handler) + + def set_configuration(self, g_config) -> None: + """ + Passes the configuration to the custom log handler so it can + be able to send uploading log requests to the dcs for CS logs. + Also it stores it locally as indicator that the config file has been read. + """ + self.g_config = g_config + self.log_handler.set_configuration(g_config=g_config) + + def clear_prelogger(self) -> None: + """ + Clears the prelogger after the app has been initiated successfully + """ + #TODO: Implement this logic + + def start_countdown(self) -> None: + if self.log_level and self.log_level_duration: + # Kill the previous running thread + if self._countdown_thread is not None and self._countdown_thread.is_alive(): + self._stop_countdown_thread_event.set() + + try: + self._stop_countdown_thread_event = threading.Event() + new_thread = threading.Thread(target=self.__reset_log_level, args=(self.log_level_duration, self._stop_countdown_thread_event,)) + new_thread.start() + self._countdown_thread = new_thread + except Exception as e: + g_utils.logger.error(f"Failed starting the countdown thread: {e}") + + def revert_to_error(self) -> None: + if self._countdown_thread is not None and self._countdown_thread.is_alive(): + self._stop_countdown_thread_event.set() + + self.set_log_level(self.DEFAULT_LOG_LEVEL) + self.set_dcs_flag(1) + self.set_log_duration((0,0,0)) # clear out the log state file + self._countdown_thread = None + self._stop_countdown_thread_event = None + g_utils.logger.info(f"Logger level reverted to: {self.DEFAULT_LOG_LEVEL}") + + def __configure_logging(cls): + """ + * Sets up the file rotation Handler + * Sets the initial log level to both root and app loggers + * Sets the g_utils logger + """ + + # Create the "prelogger" for handling logs before the main logger is initialized + LOG_FORMAT = ("%(asctime)s [%(levelname)s]: %(message)s in %(pathname)s:%(lineno)d") + prelogger = logging.getLogger("prelogger") + prelogger.setLevel('DEBUG') + prelogger_file_handler = logging.FileHandler(os.path.join(CS_LOG_PATH, 'logs.init')) + prelogger_file_handler.setLevel('DEBUG') + prelogger_file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) + prelogger.addHandler(prelogger_file_handler) + + # Create a custom TimedRotatingFileHandler that writes logs to a new file and uploads them to DCS + # every midnight, in UTC time + cls.log_handler = CustomTimedRotatingFileHandlerHandler( + filename=CS_LOG_FILE, + prelogger=prelogger, + when="midnight", + interval=1, + utc=True + ) + cls.log_handler.suffix = "%m-%d-%Y" + + # Add a formatter + default_formatter = logging.Formatter( + '[%(asctime)s] %(levelname)s in %(module)s: %(message)s | {%(pathname)s:%(lineno)d}') + cls.log_handler.setFormatter(default_formatter) + + # Set the custom handler as global handler + cls.set_log_handler() + # Set the g_utils logger + # g_utils.add_logger(cls.flask_app.logger) + g_utils.add_logger(logging.getLogger()) + # Set the log level globally + cls.set_log_level(cls.log_level) + + def __update_log_state(cls, update_object:dict): + + # Update the log state ONLY if the config file has been loaded. + if cls.g_config is not None: + cls.g_config[CONFIG_LOGS][update_object['attribute']] = update_object['value'] + + if cls.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': + helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, cls.g_config) + else: + helpers_write_config(None, CONFIG_PATH, cls.g_config) + helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, cls.g_config) + + def __reset_log_level(cls, duration:int, stop_event: threading.Event): + """ + Resets the logger level to the specified default level after a given time interval. + """ + try: + g_utils.logger.info(f"Logger level set to: {cls.log_level_name} for {duration} seconds") + while duration > 0: # Loop instead of sleep for interruptibility + duration -= 1 + sleep(1) + if stop_event.is_set(): + return + if not threading.current_thread().is_alive(): + return + cls.revert_to_error() + except InterruptedError: + g_utils.logger.info("Countdown thread interrupted.") Index: utils/reachability.py =================================================================== diff -u --- utils/reachability.py (revision 0) +++ utils/reachability.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,49 @@ +"""Implementation of reachability functionality""" + +from logging import Logger +from threading import Thread +from time import sleep + +from cloudsync.utils.globals import * + +import requests + +# REACHABILITY PARAMETERS +REACHABILITY_CHECK_PAUSE = 5 +REACHABILITY_CYCLES = 3 +REACHABILITY_CYCLE_PAUSE = 10 + + +class ReachabilityProvider: + + def __init__(self, logger: Logger, url_reachability): + self.logger = logger + self.url_reachability = url_reachability + self.reachability = False + self.thread = Thread(target=self.reachability_test, daemon=True) + self.thread.start() + + def reachability_test(self): + """ + Continuously monitors the connection to REACHABILITY_URL + """ + while True: + headers = { + 'User-Agent': USER_AGENT, + } + + try: + response = requests.get(url=self.url_reachability, headers=headers, timeout=10) + status_code = response.status_code + except requests.exceptions.RequestException: + status_code = INTERNAL_SERVER_ERROR + + if status_code == OK: + if not self.reachability: + self.logger.info('Internet UP') + self.reachability = True + else: + if self.reachability: + self.logger.warning('Internet DOWN') + self.reachability = False + sleep(REACHABILITY_CHECK_PAUSE) Index: utils/singleton.py =================================================================== diff -u --- utils/singleton.py (revision 0) +++ utils/singleton.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,28 @@ +"""Implementation of generic singleton object""" + +from threading import Lock + + +class SingletonMeta(type): + """ + Thread-safe implementation of Singleton. + """ + + _instances = {} + + _lock = Lock() + """ + Synchronizes threads during first access to the Singleton. + """ + + def __call__(cls, *args, **kwargs): + """ + Possible changes to the value of the `__init__` argument do not affect + the returned instance. + """ + # First thread acquires the lock + with cls._lock: + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] Index: utils/watchdog.py =================================================================== diff -u --- utils/watchdog.py (revision 0) +++ utils/watchdog.py (revision 81280cf29ae66ef053d820d68e6cc865817aef30) @@ -0,0 +1,157 @@ +"""Internal watchdog thread for monitoring critical daemon threads. + +Monitors registered threads via periodic is_alive() checks, attempts restart +on failure (up to a configurable limit), and creates a sentinel file to +request full process restart when recovery is exhausted. +""" + +import os +from logging import Logger +from threading import Event, Thread +from time import sleep + + +# Default sentinel file path — cs.py monitors this to trigger process restart +SENTINEL_PATH = "/tmp/cloudsync_restart_sentinel" + + +class Watchdog: + """Monitors registered daemon threads and attempts recovery on failure. + + :param Logger logger: Logger instance for health/status messages. + :param int check_interval: Seconds between is_alive() sweeps (default 30). + :param int max_restarts: Max restart attempts per thread before sentinel (default 3). + :param str sentinel_path: Path to sentinel file for full process restart. + """ + + def __init__(self, logger, check_interval=30, max_restarts=3, + sentinel_path=SENTINEL_PATH): + self._logger = logger + self._check_interval = check_interval + self._max_restarts = max_restarts + self._sentinel_path = sentinel_path + self._entries = {} + self._stop_event = Event() + self._thread = Thread(target=self._monitor, daemon=True) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def register(self, name, get_thread, restart_fn): + """Register a thread for monitoring. + + :param str name: Human-readable name (e.g. "heartbeat"). + :param callable get_thread: Zero-arg callable returning the current + Thread object (e.g. ``lambda: obj.thread``). + :param callable restart_fn: Zero-arg callable that creates and starts + a replacement thread. Use :func:`make_restart_fn` for the common + case of daemon threads backed by a bound method. + """ + self._entries[name] = { + "get_thread": get_thread, + "restart_fn": restart_fn, + "failures": 0, + } + + def start(self): + """Start the watchdog monitoring loop.""" + self._thread.start() + + def stop(self): + """Signal the watchdog to stop.""" + self._stop_event.set() + + def is_running(self): + """Return True if the watchdog thread is alive.""" + return self._thread.is_alive() + + def get_health(self): + """Return a dict mapping thread name → alive (bool).""" + result = {} + for name, entry in self._entries.items(): + thread = entry["get_thread"]() + result[name] = thread is not None and thread.is_alive() + return result + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _monitor(self): + """Main monitoring loop — runs until stop_event is set.""" + while not self._stop_event.is_set(): + self._check_threads() + self._stop_event.wait(self._check_interval) + + def _check_threads(self): + """Single sweep: check every registered thread.""" + all_healthy = True + for name, entry in self._entries.items(): + thread = entry["get_thread"]() + if thread is not None and thread.is_alive(): + continue + + all_healthy = False + entry["failures"] += 1 + failures = entry["failures"] + + self._logger.error( + "Watchdog: thread '%s' is dead (failure %d/%d)", + name, failures, self._max_restarts, + ) + + if failures <= self._max_restarts: + self._try_restart(name, entry) + else: + self._logger.error( + "Watchdog: thread '%s' exceeded max restarts (%d), " + "creating sentinel for full process restart", + name, self._max_restarts, + ) + self._create_sentinel(name) + + if all_healthy: + self._logger.info("Watchdog: all %d threads healthy", + len(self._entries)) + + def _try_restart(self, name, entry): + """Attempt to restart a dead thread.""" + try: + entry["restart_fn"]() + self._logger.info("Watchdog: thread '%s' restarted", name) + except Exception as exc: + self._logger.error( + "Watchdog: failed to restart thread '%s': %s", name, exc, + ) + + def _create_sentinel(self, failed_thread_name): + """Write a sentinel file so cs.py triggers a full process restart.""" + try: + with open(self._sentinel_path, "w") as fh: + fh.write(failed_thread_name) + self._logger.info( + "Watchdog: sentinel file created at %s", self._sentinel_path, + ) + except OSError as exc: + self._logger.error( + "Watchdog: failed to create sentinel file: %s", exc, + ) + + +def make_restart_fn(obj, thread_attr, target_fn): + """Build a restart callable for the common daemon-thread pattern. + + Creates a new ``Thread(target=target_fn, daemon=True)``, assigns it to + ``obj.``, and starts it. + + :param obj: Object that owns the thread (e.g. a HeartBeatProvider instance). + :param str thread_attr: Name of the Thread attribute on *obj* (e.g. "thread"). + :param callable target_fn: Bound method to use as the thread target. + :return: Zero-arg callable suitable for :meth:`Watchdog.register`. + """ + def _restart(): + new_thread = Thread(target=target_fn, daemon=True) + setattr(obj, thread_attr, new_thread) + new_thread.start() + return _restart