"""Implementation of an Output Bus based on the Linux file system""" from logging import Logger from collections import deque from threading import Event, Thread import datetime from cloudsync.utils import helpers from cloudsync.utils.filesystem import check_writable, check_disk_space_mb class FileOutputBus: """File Output Bus class that receives, parses and sends downstream all outbound messages""" def __init__(self, logger: Logger, max_size, file_channels_path: str): """ Initialize the File Input Bus. :param Logger logger: Logger object :param max_size: the maximum size of the message queue for the Output Bus :type max_size: int :param str file_channels_path: the path where the bus files are located """ self.last_output_message_id = 1 self.logger = logger self.file_channels_path = file_channels_path self.queue = deque(maxlen=max_size) self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() def scheduler(self) -> None: """ Continuously monitors the event flag to check for new messages :return: None """ while True: flag = self.event.wait() if flag: while len(self.queue) > 0: message_body = self.queue.popleft() self.handle_message(message_body) self.event.clear() def enqueue_message(self, message_body: str) -> bool: """ Adds messages to the queue :param str message_body: the data to add to the queue :return: True upon success, False otherwise """ if len(self.queue) < self.queue.maxlen: self.queue.append(message_body) self.event.set() return True else: return False def handle_message(self, message_body: str): """ Parses queue messages and send them downstream :param str message_body: the message body """ self.logger.debug('Message body: {0}'.format(message_body)) try: if not check_writable(self.file_channels_path): raise IOError("Output channel path is not writable: {}".format(self.file_channels_path)) if not check_disk_space_mb(self.file_channels_path, required_mb=1): raise IOError("Insufficient disk space at: {}".format(self.file_channels_path)) now = datetime.datetime.now(datetime.timezone.utc) filename = now.date().strftime("%Y_%m_%d_out.buf") timestamp_str = str(round(now.timestamp())) message_data = timestamp_str + str(self.last_output_message_id) + message_body message_crc8 = helpers.helpers_crc8(message_data.encode('utf-8')) message_body = timestamp_str + ',' + str(self.last_output_message_id) + ',' + str(message_crc8) + ',' + message_body self.logger.info('CS2UI Message: {0}'.format(message_body)) with open(self.file_channels_path + "/" + filename, "a") as f: f.write("{0}\n".format(message_body)) self.last_output_message_id += 1 except IOError as er: self.logger.error('Opening and/or writing to output file error: {0}'.format(' '.join(str(er))))