"""Handler of network requests between CloudSync, Device Registration Tool and Diality Cloud System""" from time import time from logging import Logger from threading import Event, Thread from collections import deque from random import seed, randint from cloudsync.handlers.network_request import NetworkRequest from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error from cloudsync.utils.helpers import log_func from cloudsync.common.enums import * from cloudsync.utils.globals import * from cloudsync.handlers.outgoing.handler_cs_to_mft import * from cloudsync.handlers.outgoing.handler_cs_to_dcs import * from cloudsync.handlers.incoming.handler_mft_to_cs import * ERROR_STRING = "{0},2,{1},{2}" class NetworkRequestHandler: def __init__(self, logger: Logger, max_size, output_channel, reachability_provider, error_handler): self.logger = logger self.reachability_provider = reachability_provider self.logger.info('Created Network Request Handler') self.output_channel = output_channel self.error_handler = error_handler self.queue = deque(maxlen=max_size) # Thread safe self.thread = Thread(target=self.scheduler, daemon=True) self.event = Event() self.thread.start() def scheduler(self) -> None: """ Continuously monitors the event flag to check for new requests :return: None """ while True: flag = self.event.wait() # print('Flag: {0}'.format(flag)) if flag: req = self.queue.popleft() self.handle_request(req) if len(self.queue) == 0: self.event.clear() def enqueue_request(self, req: NetworkRequest) -> bool: """ :param req: (Tuple) the data to add to the queue :return: True upon success, False otherwise """ if len(self.queue) < self.queue.maxlen: # print('network queue size before: {0}'.format(len(self.queue))) self.queue.append(req) # print('network queue size after: {0}'.format(len(self.queue))) self.event.set() return True else: return False def handle_request(self, req: NetworkRequest) -> None: """ Called on each timer event. Logs the data currently in deque :return: None """ # REGISTRATION MODE if req.request_type == NetworkRequestType.CS2MFT_REQ_REGISTRATION: try: response = cmd_outgoing_register(device_name=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_NAME], hd_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], dg_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], manf_tool_base_url=req.g_config[CONFIG_KEBORMED][ CONFIG_KEBORMED_MFT_URL], data_source_id=req.g_config[CONFIG_KEBORMED][ CONFIG_KEBORMED_DIA_ORG_ID], headers=req.headers, error_handler=self.error_handler) self.logger.debug("DRT Request registration resp: {0}".format(response)) except Exception as e: error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS: try: certificate = req.payload.get("certificate") private_key = req.payload.get("private_key") public_key = req.payload.get("public_key") cmd_incoming_set_credentials(certificate=certificate, private_key=private_key, public_key=public_key, output_channel=self.output_channel, error_handler=self.error_handler) except Exception as e: error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_INIT_CONNECTIVITY_TEST: try: url_validate = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] url_validate = urllib.parse.urljoin(url_validate, "api/device/validate") cmd_incoming_initiate_connectivity_test( url_token=req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL], url_validate=url_validate, hd_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], dg_serial=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], g_config=req.g_config, error_handler=self.error_handler ) except Exception as e: error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_FACTORY_RESET: req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] = 'operation' helpers_write_config(None, CONFIG_PATH, req.g_config) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, req.g_config) cmd_incoming_factory_reset(output_channel=self.output_channel) # OPERATION MODE elif req.request_type == NetworkRequestType.CS2DCS_REQ_SET_DEVICE_STATE: try: device_state = req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) device_state_url = urllib.parse.urljoin(base_url, "/api/device/state/{0}".format(device_state)) access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, client_secret=client_secret) if access_token is not None: response = cmd_outgoing_set_device_state(url=device_state_url, access_token=access_token, error_handler=self.error_handler) self.logger.debug("DCS Request device state response code: {0} & full response: {1}".format(response.status_code, response.text)) if response.status_code == UNASSIGNED: error = Error("{0},2,{1}, Invalid device state transition".format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value)) self.error_handler.enqueue_error(error=error) else: error = Error( "{0},2,{1}, Missing access token".format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value)) self.error_handler.enqueue_error(error=error) except Exception as e: error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT: try: treatment_log_json = req.payload patient_emr_id = treatment_log_json['data']['patient']['id'] treatment_id = treatment_log_json['data']['treatment']['treatmentCode'] base_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DCS_URL] identity_url = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_DEVICE_IDENTITY_URL] client_secret = req.g_config[CONFIG_KEBORMED][CONFIG_KEBORMED_IDP_CLIENT_SECRET] token_verification_url = urllib.parse.urljoin(base_url, DEVICE_TOKEN_VALIDATION) validate_url = urllib.parse.urljoin(base_url, "api/device/validate") data_submission_url = urllib.parse.urljoin(base_url, "/api/device/data") access_token = self.get_valid_token(identity_url=identity_url, token_verification_url=token_verification_url, client_secret=client_secret) # Step #1 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, hd_serial_number=req.g_config[CONFIG_DEVICE][ CONFIG_DEVICE_HD_SERIAL], dg_serial_number=req.g_config[CONFIG_DEVICE][ CONFIG_DEVICE_DG_SERIAL], sw_version=req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], url=validate_url, error_handler=self.error_handler) invalid_attributes = response.get("invalidAttributes", None) if len(invalid_attributes) > 0: self.logger.error( "Device with HD serial number {0}, DG serial number {1}, software version {2} is not a valid " "device for treatment. Invalid fields: {3}".format( req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL], req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL], req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_SW_VERSION], invalid_attributes)) return response else: organization_id = response.get("associatedOrganizationId", None) treatment_log_json['organizationId'] = organization_id # Step #2 - Set patient by patient_emr_id patient_with_emr_id_exists_url = urllib.parse.urljoin(base_url, "api/device/organization/{0}/patient/{1}".format( organization_id, patient_emr_id)) create_temporary_patient_url = urllib.parse.urljoin(base_url, "api/device/organization/{0}/patient/{1}".format( organization_id, patient_emr_id)) # Step #2a - Check if patient exists by patient_emr_id response = cmd_outgoing_check_if_patient_with_emr_id_exists(access_token=access_token, url=patient_with_emr_id_exists_url, error_handler=self.error_handler) # Step #2b - If patient with emr_id doesn't exist, create temporary patient if response.status_code == OK: patient_id = response.json().get("id", None) elif response.status_code == NOT_FOUND: response = cmd_outgoing_create_temporary_patient(access_token=access_token, url=create_temporary_patient_url, error_handler=self.error_handler) patient_id = response.get("id", None) else: g_utils.logger.warning("Patient didn't exist and a temporary patient couldn't be created") return response patient_device_association_url = urllib.parse.urljoin(base_url, "/api/device/organization/{0}/patient/{1}/association".format( organization_id, patient_id)) # Step #3 - Check if device is associated to patient and if not associate it response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, access_token=access_token, associate=True, error_handler=self.error_handler) # Step #4 - Send treatment report association_time = int(round(time() * S_MS_CONVERSION_FACTOR)) generated_at_time = association_time + 301000 # TEMPORARY DELAY TO COMPENSATE FOR DEVICE INTERNAL CLOCK ERROR - TODO REMOVE AFTER FIX completed_at_time = association_time + 302000 # TEMPORARY DELAY TO COMPENSATE FOR DEVICE INTERNAL CLOCK ERROR - TODO REMOVE AFTER FIX treatment_log_json['generatedAt'] = generated_at_time treatment_log_json['completedAt'] = completed_at_time treatment_log_json = json.dumps(treatment_log_json) self.logger.debug("treatment log: {0}".format(treatment_log_json)) response = cmd_outgoing_send_treatment_report(url=data_submission_url, access_token=access_token, treatment_log=treatment_log_json, error_handler=self.error_handler) if response is not None: self.logger.debug("Treatment upload response: {0}".format(response.json())) # treatment_id = response.json().get("reference", None) # Step #5 - Remove patient/device association response = cmd_outgoing_set_patient_device_association(url=patient_device_association_url, access_token=access_token, associate=False, error_handler=self.error_handler) # Step #6 - Send TX code to UI app message_body = str(OutboundMessageIDs.CS2UI_REQ_TX_CODE_DISPLAY.value) + ',1,' + treatment_id self.output_channel.enqueue_message(message_body) except Exception as e: error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, e)) self.error_handler.enqueue_error(error=error) else: g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) def get_valid_token(self, identity_url, token_verification_url, client_secret): access_token = helpers_get_stored_token() if access_token is None: access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, url=identity_url, client_secret=client_secret, error_handler=self.error_handler) else: response = cmd_outgoing_verify_token(url=token_verification_url, access_token=access_token, error_handler=self.error_handler) if response.status_code == UNAUTHORIZED: access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, url=identity_url, client_secret=client_secret, error_handler=self.error_handler) return access_token def wait_for_network(self, wait_time): counter = 0 while not self.reachability_provider.reachability and counter < wait_time: counter += 1 sleep(1) if counter == wait_time: return False else: return True