import logging import os import threading from cloudsync.handlers.logs_handler import CustomTimedRotatingFileHandlerHandler from cloudsync.utils.singleton import SingletonMeta from cloudsync.utils.globals import * from cloudsync.utils.helpers import * from cloudsync.common.enums import * class LoggingConfig(metaclass=SingletonMeta): """ Encapsulates logging configuration and setup. """ DEFAULT_LOG_LEVEL = 'ERROR' LOG_LEVELS = { 0: "NOTSET", 10: "DEBUG", 20: "INFO", 30: "WARN", 40: "ERROR", 50: "CRITICAL" } _countdown_thread = None _stop_countdown_thread_event = None def __init__(self): self.flask_app = None self.log_level = self.DEFAULT_LOG_LEVEL self.log_level_name = None self.log_level_duration = None self.log_handler = None self.g_config = None def initiate(self, app): self.flask_app = app # Genereate the Logs Dir if not os.path.exists(CS_LOG_PATH): os.makedirs(CS_LOG_PATH) # Remove existing handlers form the Flask app for handler in self.flask_app.logger.handlers: self.flask_app.logger.removeHandler(handler) # Initiate the main configuration of the logger(s) self.__configure_logging() def set_log_level(self, log_level) -> None: try: self.log_level = log_level if isinstance(log_level, int) else getattr(logging, log_level.upper()) self.log_level_name = self.LOG_LEVELS[self.log_level] # Set the log level to the custom log handler self.log_handler.setLevel(self.log_level) # Set the log level the the flask logger # self.flask_app.logger.setLevel(self.log_level) # Set the log level ot the root logger root_logger = logging.getLogger() root_logger.setLevel(self.log_level) # Update the log state file update_obj = { 'attribute': CONFIG_LOGS_CURRENT_LOG_LEVEL, 'value': self.log_level_name } self.__update_log_state(update_object=update_obj) g_utils.logger.info(f'Log level set at {self.log_level_name} ({self.log_level})') except Exception as e: g_utils.logger.error(f'Could not update the log level: {e}') def set_log_handler(self): # Add the handler to the Flask app logger # self.flask_app.logger.addHandler(self.log_handler) # Add the handler to the root logger root_logger = logging.getLogger() root_logger.addHandler(self.log_handler) def set_dcs_flag(self, flag_state:int): update_obj = { 'attribute': 'update_dcs_flag', 'value': flag_state } self.__update_log_state(update_object=update_obj) def set_log_duration(self, duration: Union[int, tuple]): if isinstance(duration, tuple): self.log_level_duration = int(duration[0]) # Update the duration at log state file update_obj = { 'attribute': CONFIG_LOGS_LOG_LEVEL_DURATION, 'value': duration[0] } self.__update_log_state(update_object=update_obj) # Update the start timestamp at log state file update_obj = { 'attribute': CONFIG_LOGS_START_TIMESTAMP, 'value': duration[1] } self.__update_log_state(update_object=update_obj) # Update the stop timestamp at log state file update_obj = { 'attribute': CONFIG_LOGS_STOP_TIMESTAMP, 'value': duration[2] } self.__update_log_state(update_object=update_obj) else: self.log_level_duration = int(duration) # Update the duration at log state file update_obj = { 'attribute': CONFIG_LOGS_LOG_LEVEL_DURATION, 'value': duration } self.__update_log_state(update_object=update_obj) g_utils.logger.debug(f"Log level duration set at {self.log_level_duration}") def get_log_level(self) -> tuple: """ Returns an object with both the `numeric_value` and `text_value` of the log level. """ return { "numeric_value": self.log_level, "text_value": self.log_level_name } def set_network_provider(self, network_request_handler) -> None: """ Passes the network provider to the custom log handler so it can be able to send uploading log requests to the dcs for CS logs. """ self.log_handler.set_network_provider(network_request_handler=network_request_handler) def set_error_provider(self, error_handler) -> None: """ Passes the error provider to the custom log handler so it can be able to send uploading log requests to the dcs for CS logs. """ self.log_handler.set_error_provider(error_handler=error_handler) def set_configuration(self, g_config) -> None: """ Passes the configuration to the custom log handler so it can be able to send uploading log requests to the dcs for CS logs. Also it stores it locally as indicator that the config file has been read. """ self.g_config = g_config self.log_handler.set_configuration(g_config=g_config) def clear_prelogger(self) -> None: """ Clears the prelogger after the app has been initiated successfully """ #TODO: Implement this logic def start_countdown(self) -> None: if self.log_level and self.log_level_duration: # Kill the previous running thread if self._countdown_thread is not None and self._countdown_thread.is_alive(): self._stop_countdown_thread_event.set() try: self._stop_countdown_thread_event = threading.Event() new_thread = threading.Thread(target=self.__reset_log_level, args=(self.log_level_duration, self._stop_countdown_thread_event,)) new_thread.start() self._countdown_thread = new_thread except Exception as e: g_utils.logger.error(f"Failed starting the countdown thread: {e}") def revert_to_error(self) -> None: if self._countdown_thread is not None and self._countdown_thread.is_alive(): self._stop_countdown_thread_event.set() self.set_log_level(self.DEFAULT_LOG_LEVEL) self.set_dcs_flag(1) self.set_log_duration((0,0,0)) # clear out the log state file self._countdown_thread = None self._stop_countdown_thread_event = None g_utils.logger.info(f"Logger level reverted to: {self.DEFAULT_LOG_LEVEL}") def __configure_logging(cls): """ * Sets up the file rotation Handler * Sets the initial log level to both root and app loggers * Sets the g_utils logger """ # Create the "prelogger" for handling logs before the main logger is initialized LOG_FORMAT = ("%(asctime)s [%(levelname)s]: %(message)s in %(pathname)s:%(lineno)d") prelogger = logging.getLogger("prelogger") prelogger.setLevel('DEBUG') prelogger_file_handler = logging.FileHandler(os.path.join(CS_LOG_PATH, 'logs.init')) prelogger_file_handler.setLevel('DEBUG') prelogger_file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) prelogger.addHandler(prelogger_file_handler) # Create a custom TimedRotatingFileHandler that writes logs to a new file and uploads them to DCS # every midnight, in UTC time cls.log_handler = CustomTimedRotatingFileHandlerHandler( filename=CS_LOG_FILE, prelogger=prelogger, when="midnight", interval=1, utc=True ) cls.log_handler.suffix = "%m-%d-%Y" # Add a formatter default_formatter = logging.Formatter( '[%(asctime)s] %(levelname)s in %(module)s: %(message)s | {%(pathname)s:%(lineno)d}') cls.log_handler.setFormatter(default_formatter) # Set the custom handler as global handler cls.set_log_handler() # Set the g_utils logger # g_utils.add_logger(cls.flask_app.logger) g_utils.add_logger(logging.getLogger()) # Set the log level globally cls.set_log_level(cls.log_level) def __update_log_state(cls, update_object:dict): # Update the log state ONLY if the config file has been loaded. if cls.g_config is not None: cls.g_config[CONFIG_LOGS][update_object['attribute']] = update_object['value'] if cls.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, cls.g_config) else: helpers_write_config(None, CONFIG_PATH, cls.g_config) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, cls.g_config) def __reset_log_level(cls, duration:int, stop_event: threading.Event): """ Resets the logger level to the specified default level after a given time interval. """ try: g_utils.logger.info(f"Logger level set to: {cls.log_level_name} for {duration} seconds") while duration > 0: # Loop instead of sleep for interruptibility duration -= 1 sleep(1) if stop_event.is_set(): return if not threading.current_thread().is_alive(): return cls.revert_to_error() except InterruptedError: g_utils.logger.info("Countdown thread interrupted.")