"""Handler of network requests between CloudSync, Device Registration Tool and Diality Cloud System""" from time import time from logging import Logger from threading import Event, Thread from collections import deque from random import seed, randint from cloudsync.handlers.network_request import NetworkRequest from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error from cloudsync.utils.helpers import log_func from cloudsync.utils.logging import LoggingConfig from cloudsync.utils.globals import * from cloudsync.common.enums import * from cloudsync.handlers.outgoing.handler_cs_to_mft import * from cloudsync.handlers.outgoing.handler_cs_to_dcs import * from cloudsync.handlers.incoming.handler_mft_to_cs import * class NetworkRequestHandler: def __init__(self, logger: Logger, max_size, output_channel, reachability_provider, error_handler): self.logger = logger self.logconf = LoggingConfig() self.reachability_provider = reachability_provider self.logger.info('Created Network Request Handler') self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() def scheduler(self) -> None: """ Continuously monitors the event flag to check for new requests :return: None """ while True: flag = self.event.wait() if flag: req = self.queue.popleft() self.handle_request(req) if len(self.queue) == 0: self.event.clear() def enqueue_request(self, req: NetworkRequest) -> bool: """ :param req: (Tuple) the data to add to the queue :return: True upon success, False otherwise """ if len(self.queue) < self.queue.maxlen: self.queue.append(req) self.event.set() return True else: return False def handle_request(self, req: NetworkRequest) -> None: """ Called on each timer event. Logs the data currently in deque :return: None """ # REGISTRATION MODE if req.request_type == NetworkRequestType.CS2MFT_REQ_REGISTRATION: try: response = cmd_outgoing_register(device_name=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_NAME], hd_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], dg_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], manf_tool_base_url=req.g_config[CONFIG_KEBORMED][ CONFIG_KEBORMED_MFT_URL], data_source_id=req.g_config[CONFIG_KEBORMED][ CONFIG_KEBORMED_DIA_ORG_ID], headers=req.headers, error_handler=self.error_handler) self.logger.debug("DRT Request registration resp: {0}".format(response)) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS: try: certificate = req.payload.get("certificate") private_key = req.payload.get("private_key") public_key = req.payload.get("public_key") cmd_incoming_set_credentials(certificate=certificate, private_key=private_key, public_key=public_key, output_channel=self.output_channel, error_handler=self.error_handler) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_INIT_CONNECTIVITY_TEST: try: url_validate = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] url_validate = urllib.parse.urljoin(url_validate, "api/device/validate") cmd_incoming_initiate_connectivity_test( url_token=req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL], url_validate=url_validate, hd_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], dg_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], g_config=req.g_config, error_handler=self.error_handler ) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_FACTORY_RESET: try: req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] = 'operation' helpers_write_config(None, CONFIG_PATH, req.g_config) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, req.g_config) cmd_incoming_factory_reset(output_channel=self.output_channel) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_FACTORY_RESET_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] = 'registration' helpers_write_config(None, CONFIG_PATH, req.g_config) # OPERATION MODE elif req.request_type == NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE: try: device_state_json = req.payload # empty {} device_state = req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) device_state_url = urllib.parse.urljoin(base_url, "/api/device") access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, client_secret=client_secret) if access_token is not None: # Decide whether to inform DCS for the new log level if helpers_should_update_dcs_log_level(req.g_config): device_state_json['logLevel'] = self.logconf.get_log_level()['text_value'].lower() self.logconf.set_dcs_flag(0) # Reset the update_dcs_flag device_state_json['state'] = DeviceStates.mapped_int_value(device_state).value device_state_json = json.dumps(device_state_json) response = cmd_outgoing_set_device_state(url=device_state_url, access_token=access_token, device_state_json=device_state_json, error_handler=self.error_handler) if response is None: return if response.status_code == 200: self.logger.debug("CS set Device state: {0}".format(response.json())) if helpers_should_update_cs_log_level(response.json(), req.g_config): new_log_level = response.json()['logLevel'] if new_log_level != "error": # Set the DCS flag self.logconf.set_dcs_flag(1) # Calculate the duration of the new log level log_level_duration = helpers_calculate_log_level_duration(response.json()) log_level_duration = log_level_duration if log_level_duration else req.g_config[CONFIG_DEVICE][CONFIG_LOGS_LOG_LEVEL_DURATION] self.logconf.set_log_duration(log_level_duration) # Update the log level self.logconf.set_log_level(new_log_level) # Start the timer self.logconf.start_countdown() else: self.logconf.revert_to_error() self.logger.debug("DCS Request device state response code: {0} & full response: {1}".format(response.status_code, response.text)) if response.status_code == UNASSIGNED: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, "Invalid device state transition") self.error_handler.enqueue_error(error=error) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, "Missing access token") self.error_handler.enqueue_error(error=error) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT: try: treatment_log_json = req.payload patient_emr_id = treatment_log_json['data']['patient']['id'] treatment_id = treatment_log_json['data']['treatment']['treatmentCode'] base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin(base_url, "api/device/validate") data_submission_url = urllib.parse.urljoin(base_url, "/api/device/data") access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, client_secret=client_secret) if access_token is None: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, "Missing access token") self.error_handler.enqueue_error(error=error) return # Step #1 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, hd_serial_number=req.g_config[CONFIG_DEVICE][ CONFIG_DEVICE_HD_SERIAL], dg_serial_number=req.g_config[CONFIG_DEVICE][ CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler) if response is None: return invalid_attributes = response.get("invalidAttributes", None) if len(invalid_attributes) > 0: self.logger.error( "Device with HD serial number {0}, DG serial number {1}, software version {2} is not a valid " "device for treatment. Invalid fields: {3}".format( req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], invalid_attributes)) return response else: organization_id = response.get("associatedOrganizationId", None) treatment_log_json['organizationId'] = organization_id # Step #2 - Set patient by patient_emr_id patient_with_emr_id_exists_url = urllib.parse.urljoin(base_url, "api/device/organization/{0}/patient/{1}".format( organization_id, patient_emr_id)) create_temporary_patient_url = urllib.parse.urljoin(base_url, "api/device/organization/{0}/patient/{1}".format( organization_id, patient_emr_id)) # Step #2a - Check if patient exists by patient_emr_id response = cmd_outgoing_check_if_patient_with_emr_id_exists(access_token=access_token, url=patient_with_emr_id_exists_url, error_handler=self.error_handler) # Step #2b - If patient with emr_id doesn't exist, create temporary patient if response is None: return if response.status_code == OK: patient_id = response.json().get("id", None) elif response.status_code == NOT_FOUND: response = cmd_outgoing_create_temporary_patient(access_token=access_token, url=create_temporary_patient_url, error_handler=self.error_handler) if response is None: return patient_id = response.get("id", None) else: g_utils.logger.warning("Patient didn't exist and a temporary patient couldn't be created") return response patient_device_association_url = urllib.parse.urljoin(base_url, "/api/device/organization/{0}/patient/{1}/association".format( organization_id, patient_id)) # Step #3 - Check if device is associated to patient and if not associate it response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, access_token=access_token, associate=True, error_handler=self.error_handler) # Step #4 - Send treatment report association_time = int(round(time() * S_MS_CONVERSION_FACTOR)) # if response.status_code == SUCCESS: # association_time = int(response.headers.get('X-Timestamp')) # generated_at_time = association_time + 1000 # completed_at_time = association_time + 2000 # else: # TEMPORARY DELAY TO COMPENSATE FOR DEVICE INTERNAL CLOCK ERROR - TODO REMOVE AFTER FIX generated_at_time = association_time + 301000 # TEMPORARY DELAY TO COMPENSATE FOR DEVICE INTERNAL CLOCK ERROR - TODO REMOVE AFTER FIX completed_at_time = association_time + 302000 treatment_log_json['organizationUserId'] = patient_id treatment_log_json['generatedAt'] = generated_at_time treatment_log_json['completedAt'] = completed_at_time treatment_log_json = json.dumps(treatment_log_json) self.logger.debug("treatment log: {0}".format(treatment_log_json)) self.logger.info("Sending treatment report to DCS") response = cmd_outgoing_send_treatment_report(url=data_submission_url, access_token=access_token, treatment_log=treatment_log_json, error_handler=self.error_handler) if response is None: return self.logger.info(f"Treatment upload response: {response.status_code}") if response.status_code != OK: self.logger.error(f"Treatment upload failed: {response.status_code} - {response.text[:500]}") self.logger.debug( "Treatment upload response body: {0}".format(response.json())) if response.status_code == OK: # Send TX code to UI app message_body = str( OutboundMessageIDs.CS2UI_REQ_TX_CODE_DISPLAY.value) + ',1,' + treatment_id self.output_channel.enqueue_message(message_body) else: g_utils.logger.warning("Treatment {0} upload failed: {1}".format( treatment_id, response.json())) # treatment_id = response.json().get("reference", None) # Step #5 - Remove patient/device association response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, access_token=access_token, associate=False, error_handler=self.error_handler) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG: try: device_log_data = req.payload base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] token_verification_url = urllib.parse.urljoin( base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin( base_url, "api/device/validate") # Step #1 - get access token access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, client_secret=client_secret) if access_token is not None: # Step #2 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, hd_serial_number=req.g_config[CONFIG_DEVICE][ CONFIG_DEVICE_HD_SERIAL], dg_serial_number=req.g_config[CONFIG_DEVICE][ CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler) if response is None: return invalid_attributes = response.get("invalidAttributes", None) if len(invalid_attributes) > 0: self.logger.error( "Device with HD serial number {0}, DG serial number {1}, software version {2} is not a valid " "device for log upload. Invalid fields: {3}".format( req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], invalid_attributes)) return response else: organization_id = response.get("associatedOrganizationId", None) device_log_data['organizationId'] = organization_id # Step #3 - upload the device log file in chunks device_log_json = helpers_construct_device_log_json(device_log_data) upload_result = cmd_outgoing_upload_file_in_chunks(base_url=base_url, access_token=access_token, file_json=device_log_json, error_handler=self.error_handler, log_file_origin='device', token_refresher=lambda: self.get_valid_token( identity_url=identity_url, token_verification_url=token_verification_url, client_secret=client_secret)) if isinstance(upload_result, dict) and not upload_result.get("accepted"): # Rejection (e.g. 409 duplicate) — send 2010 with 3 params: filename, 0, reason_code message_body = "{0},3,{1},0,{2}".format( OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, upload_result["filename"], upload_result["reason_code"]) self.output_channel.enqueue_message(message_body) elif isinstance(upload_result, str): # Success — send 2010 with 2 params: filename, 1 self.logger.debug("Device log file uploaded: {upload_result}") message_body = "{0},2,{1},1".format( OutboundMessageIDs.CS2UI_DEVICE_LOG_UPLOADED.value, upload_result) self.output_channel.enqueue_message(message_body) else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, "Missing access token") self.error_handler.enqueue_error(error=error) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_DEVICE_LOG_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG: try: cs_log_data = req.payload base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] token_verification_url = urllib.parse.urljoin( base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin( base_url, "api/device/validate") # Step #1 - get access token access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, client_secret=client_secret) if access_token is not None: # Step #2 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, hd_serial_number=req.g_config[CONFIG_DEVICE][ CONFIG_DEVICE_HD_SERIAL], dg_serial_number=req.g_config[CONFIG_DEVICE][ CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler) if response is None: return invalid_attributes = response.get("invalidAttributes", None) if len(invalid_attributes) > 0: self.logger.error( "Device with HD serial number {0}, DG serial number {1}, software version {2} is not a valid " "device for log upload. Invalid fields: {3}".format( req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], invalid_attributes)) return response else: organization_id = response.get("associatedOrganizationId", None) cs_log_data['organizationId'] = organization_id # Step #3 - upload the cs log file cs_log_json = helpers_construct_cs_log_json(cs_log_data) cs_log_filename = cmd_outgoing_upload_file_in_chunks(base_url=base_url, access_token=access_token, file_json=cs_log_json, error_handler=self.error_handler, log_file_origin='cs', token_refresher=lambda: self.get_valid_token( identity_url=identity_url, token_verification_url=token_verification_url, client_secret=client_secret)) if cs_log_filename is not None: self.logger.debug("CS log file uploaded: {cs_log_filename}") else: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_ERROR.value, "Missing access token") self.error_handler.enqueue_error(error=error) except Exception as e: error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_LOG_ERROR.value, str(e)) self.error_handler.enqueue_error(error=error) else: g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) def get_valid_token(self, identity_url, token_verification_url, client_secret): access_token = helpers_get_stored_token() if access_token is None: self.logger.info("No stored token found, requesting new token") access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, url=identity_url, client_secret=client_secret, error_handler=self.error_handler) else: response = cmd_outgoing_verify_token(url=token_verification_url, access_token=access_token, error_handler=self.error_handler) if response is None or response.status_code != OK: self.logger.warning(f"Token verification failed (status={response.status_code if response else 'None'}), refreshing token") access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, url=identity_url, client_secret=client_secret, error_handler=self.error_handler) else: self.logger.info("Token verification succeeded") return access_token def wait_for_network(self, wait_time): counter = 0 while not self.reachability_provider.reachability and counter < wait_time: counter += 1 sleep(1) if counter == wait_time: return False else: return True