from logging import Logger from threading import Event, Thread from collections import deque from cloudsync.handlers.uics_message import UICSMessage from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error from cloudsync.common.enums import * from cloudsync.utils.globals import * from cloudsync.utils.helpers import * import urllib.parse import os import shutil import time class UICSMessageHandler: def __init__(self, logger: Logger, max_size, network_request_handler, output_channel, reachability_provider, error_handler): self.logger = logger self.reachability_provider = reachability_provider self.network_request_handler = network_request_handler self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() self.logger.info('Created UI_CS Handler') def scheduler(self) -> None: """ Continuously monitors the event flag to check for new messages :return: None """ while True: flag = self.event.wait() if flag: while len(self.queue) > 0: message = self.queue.popleft() self.handle_message(message) self.event.clear() def enqueue_message(self, message: UICSMessage) -> bool: """ :param message: the message to add to the message queue :return: True upon success, False otherwise """ if len(self.queue) < self.queue.maxlen: self.queue.append(message) self.event.set() return True else: return False def handle_message(self, message: UICSMessage) -> None: self.logger.info('UI2CS Message: {0}'.format(message)) message_data = message.timestamp + message.sequence + message.ID + message.size if len(message.parameters) > 0: for param in message.parameters: message_data += param else: self.logger.debug("UI2CS message has 0 parameters: {0}".format(message)) message_calculated_crc8 = helpers_crc8(message_data.encode('utf-8')) if message.CRC != str(message_calculated_crc8): error = Error.crc_mismatch(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_BAD_CRC_ERROR.value, message.sequence, message.CRC, str(message_calculated_crc8)) self.error_handler.enqueue_error(error=error) else: # REGISTRATION MODE # REGISTRATION REQUEST if (InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_REGISTRATION) and \ (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration'): self.logger.info('UI2CS_REQ_REGISTRATION request received') self.logger.debug('Applicable config: {0}'.format(message.g_config)) if (len(message.parameters) != 3) or (message.parameters[0] is None) or (message.parameters[2] is None): error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, "invalid # of parameters for registration") self.error_handler.enqueue_error(error=error) else: try: message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] = message.parameters[0] if len(message.parameters[1]) > 0: message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] = message.parameters[1] else: self.logger.debug("Used HD Serial as value for DG Serial") message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] = message.parameters[0] message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] = message.parameters[2] message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_NAME] = message.parameters[0] device_ip = helpers_get_ip_address() message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_IP] = device_ip helpers_write_config(None, CONFIG_PATH, message.g_config) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) helpers_add_to_network_queue(network_request_handler=self.network_request_handler, request_type=NetworkRequestType.CS2MFT_REQ_REGISTRATION, url='', payload={}, method='', g_config=message.g_config, success_message='CS2MFT_REQ_REGISTRATION request added to network ' 'queue') except IOError: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CONFIG_ERROR.value, "Error updating device config file") self.error_handler.enqueue_error(error=error) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) # OPERATION MODE # SEND DEVICE STATE REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_DEVICE_STATE and \ (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): self.logger.info("UI2CS_SEND_DEVICE_STATE request received") self.logger.debug("HD Mode: {0}".format(HDOpModes.mapped_str_value(message.parameters[0]))) self.logger.debug("HD Sub-Mode: {0}".format(HDOpSubModes.mapped_str_value(message.parameters[1]))) try: device_state = helpers_device_state_to_cloud_state( HDOpModes.mapped_str_value(message.parameters[0]), HDOpSubModes.mapped_str_value(message.parameters[1])) except Exception: device_state = DeviceStates.UNKNOWN_STATE error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_UNKNOWN_DEVICE_STATE_ERROR.value, "Unknown device state received from UI") self.error_handler.enqueue_error(error=error) message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] = device_state.value self.logger.info("Device state: {0}".format(device_state.name)) try: helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, message.g_config) helpers_add_to_network_queue(network_request_handler=self.network_request_handler, request_type=NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE, url='', payload={}, method='', g_config=message.g_config, success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network ' 'queue') except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) # SEND TREATMENT REPORT REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_TREATMENT_REPORT and \ (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): self.logger.info("UI2CS_SEND_TREATMENT_REPORT request received") try: hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) treatment_log_json = helpers_read_treatment_log_file(message.parameters[0]) if treatment_log_json: treatment_log_json['checksum'] = helpers_sha256_checksum(json.dumps(treatment_log_json['data'])) treatment_log_json['serialNumber'] = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] g_utils.logger.debug("Treatment log {0}".format(treatment_log_json)) helpers_add_to_network_queue(network_request_handler=self.network_request_handler, request_type=NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT, url='', payload=treatment_log_json, method='', g_config=message.g_config, success_message='CS2DCS_REQ_SEND_TREATMENT_REPORT request added to network' 'queue') except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) # DECOMMISSIONING REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_DECOMMISSION and \ (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): self.logger.info("UI2CS_REQ_DECOMMISSION request received") try: helpers_decommission_device() message_body = str( OutboundMessageIDs.CS2UI_REQ_DEVICE_DECOMMISSIONED.value) + ',0' self.output_channel.enqueue_message(message_body) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_DECOMMISSION_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) # CHECK-IN REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_SEND_CHECKIN: # send the check-in in either registration or operation self.logger.info("UI2CS_SEND_CHECKIN request received") try: message_body = str( OutboundMessageIDs.CS2UI_REQ_CHECKIN.value) + ',0' self.output_channel.enqueue_message(message_body) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_CHECKIN_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) # CS LOG RETENTION elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_REQ_LOG_RETENTION: self.logger.info("UI2CS_REQ_LOG_RETENTION request received") if len(message.parameters) >= 1 and int(message.parameters[0]) > 0: try: self.logger.info("Used Percentage: {0}".format(message.parameters[0])) num_of_files, del_size_mb = helpers_log_retention(int(message.parameters[0])) message_body = str( OutboundMessageIDs.CS2UI_REQ_LOG_RETENTION.value) + f',{num_of_files}' + f',{del_size_mb}' self.output_channel.enqueue_message(message_body) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_RETENTION_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_RETENTION_ERROR.value, "invalid # of parameters for log retention") self.error_handler.enqueue_error(error=error) # ERROR MESSAGE RECEIVED FROM UI elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_ERROR: error_body = "{0},{1},{2}".format(InboundMessageIDs.UI2CS_ERROR.value, message.size, message.parameters[0]) if len(message.parameters) > 1: for parameter in message.parameters[1:]: error_body += ",{0}".format(parameter) error = Error(error_body=error_body) self.error_handler.enqueue_error(error=error) # UPLOAD DEVICE LOG TO CLOUD REQUEST elif InboundMessageIDs.mapped_str_value(message.ID) == InboundMessageIDs.UI2CS_UPLOAD_DEVICE_LOG and \ (message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation'): self.logger.info("UI2CS_UPLOAD_DEVICE_LOG request received") if (len(message.parameters) != 2) or (message.parameters[0] is None) or (message.parameters[1] is None): error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, "invalid # of parameters for file upload request") self.error_handler.enqueue_error(error=error) else: try: local_checksum = helpers_sha256_checksum(message.parameters[0]) if local_checksum != message.parameters[1]: raise ValueError('No valid sha256 value.') hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] sw_version = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION] self.logger.debug('hd: {0},dg: {1},sw: {2}'.format(hd_serial_number, dg_serial_number, sw_version)) device_log_data = { "path": message.parameters[0], "serialNumber": message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], "checksum": local_checksum } g_utils.logger.debug("Device log data {0}".format(device_log_data)) helpers_add_to_network_queue(network_request_handler=self.network_request_handler, request_type=NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG, url='', payload=device_log_data, method='', g_config=message.g_config, success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to network queue') except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error)