from time import sleep from logging import Logger from threading import Event, Thread from collections import deque from cloudsync.handlers.error import Error from cloudsync.common.enums import * from cloudsync.utils.globals import * from cloudsync.utils.helpers import * class ErrorHandler: def __init__(self, logger: Logger, max_size, output_channel): self.logger = logger self.output_channel = output_channel self.queue = deque(maxlen=max_size) # Thread safe self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() self.logger.info('Created Error Handler') def scheduler(self) -> None: """ Continuously monitors the event flag to check for new errors :return: None """ while True: flag = self.event.wait() if flag: while len(self.queue) > 0: error = self.queue.popleft() self.handle_error(error) self.event.clear() def enqueue_error(self, error: Error) -> bool: """ :param error: the error to add to the queue :return: True upon success, False otherwise """ if len(self.queue) < self.queue.maxlen: self.queue.append(error) self.event.set() return True else: return False def handle_error(self, error: Error): if InboundMessageIDs.mapped_str_value(error.ID) == InboundMessageIDs.UI2CS_ERROR: self.logger.error('UI App Error {0}: {1}'.format(error.code, error.parameters)) return # TODO Add specific UI error message handling when necessary if OutboundMessageIDs.mapped_str_value(error.ID) == OutboundMessageIDs.CS2UI_ERROR: self.logger.error('CS App Error {0} - {1}: {2}'.format(error.code, ErrorIDs.mapped_str_value(error.code), error.parameters)) parameter_string = "" for parameter in error.parameters: parameter_string += "," parameter_string += parameter message_body = str(OutboundMessageIDs.CS2UI_ERROR.value) + ',' + error.size + ',' + error.code + parameter_string self.output_channel.enqueue_message(message_body)