"""Implementation of an Input Bus based on the Linux file system and iNotify library""" from logging import Logger from threading import Event, Thread from time import time, sleep import inotify.adapters from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler from cloudsync.handlers.uics_message import UICSMessage from cloudsync.utils.filesystem import check_readable class FileInputBus: """File Input Bus class that monitors, parses and sends upstream all inbound messages""" def __init__(self, logger: Logger, file_channels_path: str, input_channel_name: str, g_config: dict, message_handler: UICSMessageHandler): """ Initialize the File Input Bus. :param Logger logger: Logger object :param str file_channels_path: the path where the bus files are located :param str input_channel_name: the name of the input channel file :param dict g_config: the system configuration object :param UICSMessageHandler message_handler: the upstream message handler """ self.last_input_message_id = 0 self.logger = logger self.file_channels_path = file_channels_path self.input_channel_name = input_channel_name self.g_config = g_config self.message_handler = message_handler self.i = inotify.adapters.Inotify() self.i.add_watch(self.file_channels_path) self.thread = Thread(target=self.input_channel_handler, daemon=True) self.event = Event() self.thread.start() def input_channel_handler(self): """ The Input Channel Handler - it parses and sends upstream all messages arriving on the File Input Bus """ for event in self.i.event_gen(yield_nones=False): (_, type_names, path, filename) = event # self.logger.debug("PATH=[{}] FILENAME=[{}] EVENT_TYPES={}".format(path, filename, type_names)) if (('IN_MODIFY' in type_names) or ('IN_CLOSE_WRITE' in type_names)) and ( filename.endswith(self.input_channel_name)): input_file_path = self.file_channels_path + "/" + filename if not check_readable(input_file_path): self.logger.warning('Input file not readable: {0}'.format(input_file_path)) continue try: f = open(input_file_path) except IOError as er: self.logger.error('Opening input file error: {0}'.format(' '.join(str(er)))) continue new_input_messages = [] for line in f.readlines(): message_parameters = line.strip().split(',') try: sequence_id = int(message_parameters[1]) except (ValueError, IndexError): self.logger.warning('Skipping message with invalid sequence: {0}'.format(line.strip())) continue if sequence_id > self.last_input_message_id: new_message = UICSMessage(line.strip(), self.g_config) message_added_to_queue = self.message_handler.enqueue_message(new_message) if message_added_to_queue: new_input_messages.append((sequence_id, line.strip())) new_input_messages.sort(key=lambda x: x[0]) # self.logger.debug("New Input messages added to queue: {0}".format(new_input_messages)) if len(new_input_messages) > 0: last_new_message_id = new_input_messages[len(new_input_messages) - 1][0] self.last_input_message_id = last_new_message_id else: last_new_message_id = self.last_input_message_id