########################################################################### # # Copyright (c) 2021-2024 Diality Inc. - All Rights Reserved. # # THIS CODE MAY NOT BE COPIED OR REPRODUCED IN ANY FORM, IN PART OR IN # WHOLE, WITHOUT THE EXPLICIT PERMISSION OF THE COPYRIGHT OWNER. # # @file data_logger.py # # @author (last) Quang Nguyen # @date (last) 07-Jul-2021 # @author (original) Peter Lucia # @date (original) 30-Apr-2021 # ############################################################################ import glob import os import shutil import threading from collections import deque from logging import Logger from time import sleep from typing import Tuple from multiprocessing import Process from pathlib import Path import pandas as pd from ..utils.base import _FauxLogger class DataLogger: MAX_CHUNK_SIZE = 20 def __init__(self, folder: str = "/tmp/", # must be the full path logger: Logger = _FauxLogger()): """ Logs data to xlsx file @param folder: (str) the destination filepath """ super().__init__() self.queue = deque() # Thread safe self.disable_logging = False self.thread = threading.Thread(target=self.logging_scheduler, daemon=True) self.base_folder = folder self.csv_writers = {} self.logger = logger self.thread.start() self.path_disable_logging = "/tmp/DIALIN_DISABLE_LOGGING" def logging_scheduler(self) -> None: """ Called on each timer event. Calls the data logger method @return: None """ while True: self.do_data_logging() sleep(0.05) def add_data(self, data: Tuple): """ @param data: (Tuple) the data to add to the queue @return: None """ # ("module", "timestamp", "header", "value") if not os.path.exists(self.path_disable_logging): self.queue.append(data) # TODO: Remove later - keep for debugging # else: # self.logger.debug("Ignoring {0}".format(data)) def do_data_logging(self) -> None: """ Called on teach timer event. Logs the data currently in deque @return: None """ i = 0 while self.queue: module, timestamp, data_name, data_value = self.queue.pop() filename = module folder = os.path.join(self.base_folder, filename + "Log") if not os.path.isdir(folder): os.mkdir(folder) filepath = os.path.join(folder, data_name + ".csv") if not os.path.exists(filepath): header = ["timestamp", data_name] with open(filepath, 'w') as f: f.write(",".join(header) + "\n") else: with open(filepath, 'a') as f: f.write(timestamp + ", " + data_value + "\n") i += 1 if i >= self.MAX_CHUNK_SIZE: break def export_to_xlsx(self, output_path: str) -> None: """ Called when the user wishes to export all captured logs to a xlsx file @param output_path: (str) the destination output path @return: None """ process = Process(target=self._do_export_to_xlsx, args=(output_path,)) process.start() def _do_export_to_xlsx(self, output_path: str) -> None: """ Performs the actual export to xlsx @param output_path: (str) the destination output path @return: None """ Path(self.path_disable_logging).touch() self.logger.debug("Starting data export to {0}".format(output_path)) log_path = os.path.join(self.base_folder, "*Log") folders = glob.glob(log_path) if len(folders) == 0: self.logger.debug("No folder with data to export") os.remove(self.path_disable_logging) return writer = pd.ExcelWriter(output_path) for folder in folders: files = os.path.join(folder, "*.csv") csv_files = glob.glob(files) module_name = os.path.basename(folder) df = pd.DataFrame() for csv_file in csv_files: df_data = pd.read_csv(csv_file) df_data = df_data.set_index("timestamp") df = df.join(df_data, how="outer") try: df.to_excel(writer, sheet_name=module_name) self.logger.debug("Added {0} to {1}".format(module_name, output_path)) except ValueError as e: self.logger.error("Error during write to excel: {0}".format(e)) writer.save() self.logger.debug("Finished data export to {0}".format(output_path)) os.remove(self.path_disable_logging) def clear_logs(self): """ Called when the user clears the logs @return: None """ Path(self.path_disable_logging).touch() log_path = os.path.join(self.base_folder, "*Log") folders = glob.glob(log_path) for folder in folders: self.logger.debug("Removing {0}".format(folder)) shutil.rmtree(folder) os.remove(self.path_disable_logging)