import werkzeug werkzeug.cached_property = werkzeug.utils.cached_property from flask import Flask, Response, request from flask_restplus import Api, Resource, fields, reqparse from cloudsync.handlers.error import Error from cloudsync.utils.heartbeat import HeartBeatProvider from cloudsync.busses.file_input_bus import FileInputBus from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler from cloudsync.handlers.cs_mft_dcs_request_handler import NetworkRequestHandler from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.busses.file_output_bus import FileOutputBus from cloudsync.utils.reachability import ReachabilityProvider from cloudsync.utils.globals import * from cloudsync.utils.helpers import * from cloudsync.utils.logging import LoggingConfig from cloudsync.utils.watchdog import Watchdog, make_restart_fn import hmac import os import signal import sys import threading VERSION = "0.5.2" arguments = sys.argv app = Flask(__name__) api = Api(app=app, version=VERSION, title="CloudSync Registration API", description="Interface with DIA Manufacturing Tool * DCS") logconf = LoggingConfig() logconf.initiate(app=app) # API key authentication for deployed instances CS_API_KEY = os.environ.get('CS_API_KEY') @app.before_request def check_api_key(): if CS_API_KEY is None: return # no key configured — allow all requests key = request.headers.get('X-Api-Key', '') if not hmac.compare_digest(key, CS_API_KEY): return {"message": "Unauthorized: invalid or missing API key"}, 401 sleep(5) # wait for UI to prepare the configurations partition try: ok = False g_utils.logger.info(SETUP_CONSOLE_LINE) if EXEC_MODE_UPGRADE_KEY in arguments: # ---------- upgrade # Read from $HOME if os.path.isfile(CONFIG_PATH) and os.access(CONFIG_PATH, os.R_OK | os.W_OK): #TODO test if this check needed? EXEC_MODE = EXEC_MODE_UPGRADE g_config = helpers_read_config(CONFIG_PATH) ok = True else: if EXEC_MODE_UPDATE_KEY in arguments: # ---------- update EXEC_MODE = EXEC_MODE_UPDATE g_utils.logger.info("CloudSync starting in %s mode", EXEC_MODE) g_utils.logger.info("CloudSync update config started...") # Update the $HOME from /var/configuraitons/ and update result in both for later update oldConfig = helpers_read_config(OPERATION_CONFIG_FILE_PATH) newConfig = helpers_read_config(CONFIG_PATH) newConfig.update(oldConfig) helpers_write_config(None , CONFIG_PATH , newConfig) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, newConfig) g_utils.logger.info("CloudSync update config done.") # Read from /var/configuraitons/ # ---------- normal if os.path.isfile(OPERATION_CONFIG_FILE_PATH) and os.access(OPERATION_CONFIG_FILE_PATH, os.R_OK): #TODO test if this check needed? g_config = helpers_read_config(OPERATION_CONFIG_FILE_PATH) CONFIG_PATH = OPERATION_CONFIG_FILE_PATH ok = True if ok: g_utils.logger.info("CloudSync started in %s mode", EXEC_MODE) g_utils.logger.info("Using config: %s", CONFIG_PATH) else: g_utils.logger.error(f"Error reading config file in {EXEC_MODE}") g_utils.logger.info(SETUP_CONSOLE_LINE) except Exception as e: g_utils.logger.error(f"Error reading config file - {e}") g_utils.logger.info(SETUP_CONSOLE_LINE) sys.exit(0) try: reachability_provider = ReachabilityProvider(logger=app.logger, url_reachability=g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_REACHABILITY_URL]) except Exception as e: g_utils.logger.error( "Reachability URL missing from config file. Using Default URL - {0}".format(DEFAULT_REACHABILITY_URL)) reachability_provider = ReachabilityProvider( logger=app.logger, url_reachability=DEFAULT_REACHABILITY_URL) try: g_utils.add_reachability_provider(reachability_provider=reachability_provider) output_channel = FileOutputBus(logger=app.logger, max_size=100, file_channels_path=UI2CS_FILE_CHANNELS_PATH) error_handler = ErrorHandler(logger=app.logger, max_size=100, output_channel=output_channel) network_request_handler = NetworkRequestHandler(logger=app.logger, max_size=1, output_channel=output_channel, reachability_provider=reachability_provider, error_handler=error_handler) message_handler = UICSMessageHandler(logger=app.logger, max_size=20, network_request_handler=network_request_handler, output_channel=output_channel, reachability_provider=reachability_provider, error_handler=error_handler) ui_cs_bus = FileInputBus(logger=app.logger, file_channels_path=UI2CS_FILE_CHANNELS_PATH, input_channel_name="inp.buf", g_config=g_config, message_handler=message_handler) heartbeat_provider = HeartBeatProvider(logger=app.logger, network_request_handler=network_request_handler, output_channel=output_channel) logconf.set_network_provider(network_request_handler=network_request_handler) logconf.set_error_provider(error_handler=error_handler) logconf.set_configuration(g_config=g_config) logconf.set_log_level(g_config[CONFIG_LOGS][CONFIG_LOGS_DEFAULT_LOG_LEVEL]) if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': heartbeat_provider.send_heartbeat = True else: heartbeat_provider.send_heartbeat = False g_utils.logger.debug("Current config path: {0}".format(CONFIG_PATH)) parser = reqparse.RequestParser() # Watchdog — monitors critical daemon threads (operation mode only). # During registration the device is semi-managed by the manufacturing # tool, so autonomous recovery is unnecessary and could interfere with # the registration-to-operation transition. watchdog = Watchdog(logger=app.logger) if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': watchdog.register("reachability", lambda: reachability_provider.thread, make_restart_fn(reachability_provider, "thread", reachability_provider.reachability_test)) watchdog.register("output_bus", lambda: output_channel.thread, make_restart_fn(output_channel, "thread", output_channel.scheduler)) watchdog.register("error_handler", lambda: error_handler.thread, make_restart_fn(error_handler, "thread", error_handler.scheduler)) watchdog.register("network_request_handler", lambda: network_request_handler.thread, make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler)) watchdog.register("message_handler", lambda: message_handler.thread, make_restart_fn(message_handler, "thread", message_handler.scheduler)) watchdog.register("file_input_bus", lambda: ui_cs_bus.thread, make_restart_fn(ui_cs_bus, "thread", ui_cs_bus.input_channel_handler)) watchdog.register("heartbeat", lambda: heartbeat_provider.thread, make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat)) watchdog.start() g_utils.logger.info("Watchdog started (operation mode)") else: g_utils.logger.info("Watchdog skipped (registration mode — device is managed)") except Exception as e: g_utils.logger.error("Failed to start CS - {0}".format(e)) sys.exit(0) # Signal handlers for graceful shutdown shutdown_event = threading.Event() def _graceful_shutdown(signum, _frame): g_utils.logger.info("Received signal %d, initiating graceful shutdown...", signum) shutdown_event.set() try: watchdog.stop() except Exception: pass sys.exit(0) signal.signal(signal.SIGTERM, _graceful_shutdown) signal.signal(signal.SIGINT, _graceful_shutdown) @api.route("/version") class Version(Resource): @api.response(OK, "Success") def get(self): app.logger.info("Received /version HTTP request") if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration': return { "version": VERSION }, OK # START - REGISTRATION ENDPOINTS @api.route("/credentials") class Credentials(Resource): FIELD_CREDENTIALS = api.model("Credentials", { "certificate": fields.String, "public_key": fields.String, "private_key": fields.String, }) @api.doc(body=FIELD_CREDENTIALS, validate=True) @api.response(OK, "Success") @api.response(BAD_REQUEST, "Validation Error") def put(self): """ Update the device's certificate and keys Manufacturing Tool -> Device """ app.logger.info("Received /credentials HTTP request") if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration': payload = request.json certificate = payload.get("certificate", None) private_key = payload.get("private_key", None) public_key = payload.get("public_key", None) invalid_params = [] if certificate is None: invalid_params.append("certificate") if private_key is None: invalid_params.append("private_key") if public_key is None: invalid_params.append("public_key") if len(invalid_params) > 0: error = Error("{0},2,{1}, invalid credentials: {2}".format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, invalid_params)) error_handler.enqueue_error(error=error) return {"invalidAttributes": invalid_params}, BAD_REQUEST try: helpers_add_to_network_queue(network_request_handler=network_request_handler, request_type=NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS, url=request.url, payload=payload, method=request.method, g_config=g_config, success_message='CS2MFT_REQ_SET_CREDENTIALS request added to network ' 'queue') except Exception as e: error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, e)) error_handler.enqueue_error(error=error) return { "invalidAttributes": [], }, OK @api.route("/validate") class Validate(Resource): @api.response(OK, "Success") def head(self): """ Initiate device connectivity test Manufacturing Tool -> Device """ app.logger.info("Received /validate HTTP request") if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration': r = NetworkRequest(request_type=NetworkRequestType.MFT2CS_REQ_INIT_CONNECTIVITY_TEST, url=request.url, payload={}, method=request.method, g_config=g_config, ) network_request_handler.enqueue_request(r) return {}, OK @api.route("/reset") class Reset(Resource): @api.response(OK, "Success") def head(self): """ Initiate a factory reset Assumes the connectivity test has already been completed Manufacturing Tool -> Device """ app.logger.info("Received /reset HTTP request") if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration': r = NetworkRequest(request_type=NetworkRequestType.MFT2CS_REQ_FACTORY_RESET, url=request.url, payload={}, method=request.method, g_config=g_config, ) network_request_handler.enqueue_request(r) heartbeat_provider.send_heartbeat = True return {}, OK # END - REGISTRATION ENDPOINTS def main(): if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration': app.run(debug=False, use_reloader=False, host="0.0.0.0", port=g_config[CONFIG_DEVICE][CONFIG_DEVICE_PORT]) else: while True: sleep(1) if __name__ == '__main__': main()