########################################################################### # # Copyright (c) 2020-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file base.py # # @author (last) Micahel Garthwaite # @date (last) 16-May-2023 # @author (original) Peter Lucia # @date (original) 22-Jun-2020 # ############################################################################ import logging import os from abc import ABC, abstractmethod from datetime import datetime from enum import Enum from threading import Timer INTERVAL_1s = 1 INTERVAL_5s = 5 INTERVAL_10s = 10 INTERVAL_60s = 60 class AbstractObserver(ABC): """ Publicly accessible parent class for all observers. The update method will receive data when data is made available """ @abstractmethod def update(self): """ Attach an observer """ pass class _FauxLogger: def __init__(self, printing_enabled=False): self.printing_enabled = printing_enabled def debug(self, msg): if self.printing_enabled: print("DEBUG: {0}".format(msg)) def info(self, msg): if self.printing_enabled: print("INFO: {0}".format(msg)) def warn(self, msg): if self.printing_enabled: print("WARN: {0}".format(msg)) def warning(self, msg): if self.printing_enabled: print("WARNING: {0}".format(msg)) def error(self, msg): if self.printing_enabled: print("ERROR: {0}".format(msg)) def critical(self, msg): if self.printing_enabled: print("CRITICAL: {0}".format(msg)) class LogManager: LOG_FMT = '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s' LOG_FMT_NO_METADATA = '%(message)s' LOG_DT_FMT = '%m-%d-%Y:%H:%M:%S' def __init__(self, log_level=None, log_filepath="Dialin.log"): """ @param log_level: (str) or (None) if not set, contains the logging level @param log_filepath: (str) the log filepath """ self.log_level = log_level self.logging_enabled = self.log_level_enables_logging(log_level) self.log_filepath = self.get_available_log_path(log_filepath) self.logger = None self.configure_logging(self.log_filepath) def log_level_enables_logging(self, log_level: str): """ Check if the log level string is a valid logging level @param log_level: (str) the logging level @return: True if the log level is valid, False otherwise """ return log_level is not None and log_level.upper() in self.get_logging_levels() @staticmethod def get_logging_levels(): """ Gets all possible logging levels @return: All possible logging levels """ return ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "CAN_ONLY", "PRINT_ONLY"] def request_new_log_path(self, path: str): """ Clears the logger, gets a new log filepath and configures the logger to use it. @param path: the requested log filepath @return: The new log filepath """ self.clear_logger() self.log_filepath = self.get_available_log_path(path) self.configure_logging(self.log_filepath) return self.log_filepath def clear_logger(self): """ If the logger has been created, clear its handlers @return: True if successful, False otherwise """ # remove existing handlers if self.logger is not None: for handler in self.logger.handlers: self.logger.removeHandler(handler) return True return False def configure_logging(self, log_path): """ Sets up the logger to use the provided log path @param log_path: Path to the log file @return: True if success, False otherwise """ if self.logger is not None: print("Logger already configured. Please clear the logger first.") return False # configure the logging if self.logging_enabled and self.log_level not in ["PRINT_ONLY", "CAN_ONLY"]: numeric_level = getattr(logging, self.log_level.upper(), logging.ERROR) self.logger = logging.getLogger("Dialin") self.logger.setLevel(numeric_level) fh = logging.FileHandler(log_path) fh.setLevel(numeric_level) ch = logging.StreamHandler() ch.setLevel(numeric_level) formatter = logging.Formatter(fmt=self.LOG_FMT, datefmt=self.LOG_DT_FMT) fh.setFormatter(formatter) ch.setFormatter(formatter) self.logger.addHandler(fh) self.logger.addHandler(ch) else: self.logger = _FauxLogger(printing_enabled=self.log_level == "PRINT_ONLY") return True def print_and_log(self, message, log_level=logging.DEBUG): """ Prints a message if its severity is >= the current log level. Also logs the message. @param message: The message to print and log @param log_level: The logging level, indicates the severity of the message @return:: None """ if not self.logging_enabled: return if log_level == logging.DEBUG: self.logger.debug(message) elif log_level == logging.INFO: self.logger.info(message) elif log_level == logging.WARNING: self.logger.warning(message) elif log_level == logging.ERROR: self.logger.error(message) elif log_level == logging.CRITICAL: self.logger.critical(message) @staticmethod def get_available_log_path(filepath: str): """ Gets an available log path from filepath appends integer to the end if file already exists. @param filepath: The full path to the file @return: (str) The available log filepath """ if not os.path.exists(filepath): return filepath path, ext = os.path.splitext(filepath) i = 0 while os.path.exists("{0}{1}{2}".format(path, i, ext)): i += 1 return "{0}{1}{2}".format(path, i, ext) class AbstractSubSystem: @abstractmethod def __init__(self): """ Initialization function for the sub system # The abstract base class requires all abstract methods are overridden by children classes """ self._observers = [] self._datetime_fmt = "%m.%d.%Y_%I.%M.%S.%f" pass def attach(self, observer: AbstractObserver): """ Attach an observer so it is updated upon published events """ self._observers.append(observer) def detach(self, observer: AbstractObserver): """ Detach an observer """ self._observers.remove(observer) def publish(keys): """ Decorator that accepts a list of variable names to publish To be used in any AbstractSubSystem @param keys: The variable names to publish @return: A function that will take a function and return another function """ def _decorator(func): """ @param func: The handler function @return: The function to wrap around _publish """ def _wrapper(self, *args, **kwargs): func(self, *args, **kwargs) result = {} if not self._observers: return None result["datetime"] = datetime.now() result["subsystem"] = self.__class__.__name__ for key in keys: result[key] = getattr(self, key) for observer in self._observers: observer.update(result) return _wrapper return _decorator class DialinEnum(Enum): @classmethod def has_value(cls, value): return value in cls._value2member_map_ class AlarmEnum(Enum): def __init__(self, *args): cls = self.__class__ if any(self.value == member.value for member in cls): raise ValueError("aliases not allowed: %r --> %r" % (self.name, cls(self.value).name)) @classmethod def has_value(cls, value): return value in cls._value2member_map_ class IntervalTimer(object): """ A class object that is used to execute a function on a timed interval. Timed interval auto starts on object creation. Uses to send CAN messages at a specified interval. """ def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.daemon = True self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False