"""Handler of commands received by CloudSync app from the Device Registration Tool""" import os from cloudsync.handlers.outgoing import * from cloudsync.utils.helpers import * from cloudsync.utils.globals import * from cloudsync.utils.filesystem import check_writable, check_disk_space_mb from cloudsync.common.enums import * from cloudsync.handlers.outgoing.handler_cs_to_mft import * from cloudsync.handlers.outgoing.handler_cs_to_dcs import * @log_func def cmd_incoming_set_credentials(certificate: str, private_key: str, public_key: str, output_channel, error_handler: ErrorHandler) -> None: """ Sets the credentials on the device :param certificate: The X.509 certificate :param private_key: The private key :param public_key: The public key :param output_channel: CS_UI output channel :param error_handler: global error handler :return: None """ try: if not os.path.exists(CREDENTIALS_PATH): os.makedirs(CREDENTIALS_PATH) if not check_writable(CREDENTIALS_PATH): raise IOError("Credentials path is not writable: {}".format(CREDENTIALS_PATH)) if not check_disk_space_mb(CREDENTIALS_PATH, required_mb=1): raise IOError("Insufficient disk space at: {}".format(CREDENTIALS_PATH)) with open(CREDENTIALS_CERTIFICATE_X509, 'w') as f: f.write(certificate) with open(CREDENTIALS_PRIVATE_KEY, 'w') as f: f.write(private_key) with open(CREDENTIALS_PUBLIC_KEY, 'w') as f: f.write(public_key) message_body = str( OutboundMessageIDs.CS2UI_REQ_SAVE_CREDENTIALS.value) + ',3,' + CREDENTIALS_CERTIFICATE_X509 + ',' + CREDENTIALS_PRIVATE_KEY + ',' + CREDENTIALS_PUBLIC_KEY output_channel.enqueue_message(message_body) except IOError as er: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, "Error writing device credentials") error_handler.enqueue_error(error=error) g_utils.logger.error('Error writing device credentials: {0}'.format(' '.join(er.args))) @log_func def cmd_incoming_initiate_connectivity_test(url_token: str, url_validate: str, hd_serial: str, dg_serial: str, sw_version: str, g_config: dict, error_handler: ErrorHandler) -> None: """ Initiates the connectivity test on the device: - verifies if device can connect to DCS - verifies if device can obtain a valid token - verifies if device information (hd serial, dg serial & software version) matches the cloud values :param url_token: Identity Provider URL :param url_validate: Device connectivity test URL :param hd_serial: The hd serial number :param dg_serial: The dg serial number :param sw_version: The software version :param g_config: The configuration dictionary :param error_handler: global error handler :return: None """ try: client_secret = g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, url=url_token, client_secret=client_secret, error_handler=error_handler) if access_token is not None: g_utils.logger.debug("Access token: {0}".format(access_token)) response = cmd_outgoing_validate_device(access_token=access_token, hd_serial_number=hd_serial, dg_serial_number=dg_serial, sw_version=sw_version, url=url_validate, error_handler=error_handler) if response is not None: g_utils.logger.debug("Response: {0}".format(response)) invalid_attributes = response.get("invalidAttributes", None) g_utils.logger.info("Invalid fields: {0}".format(invalid_attributes)) if invalid_attributes is not None: json_resp = cmd_outgoing_validation_result(invalid_attributes, hd_serial, g_config, error_handler) g_utils.logger.debug("Validation result request response (DRT --> CS): {0}".format(json_resp)) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, "Validation failed due to invalid DCS response format") error_handler.enqueue_error(error=error) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, "Validation failed due to missing response from DCS") error_handler.enqueue_error(error=error) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, "Validation failed due to missing token") error_handler.enqueue_error(error=error) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, str(e)) error_handler.enqueue_error(error=error) @log_func def cmd_incoming_factory_reset(output_channel) -> None: """ Initiates a factory reset on the device :param output_channel: CS2UI output channel :return: None """ message_body = str(OutboundMessageIDs.CS2UI_REQ_FACTORY_RESET.value) + ',0' output_channel.enqueue_message(message_body)