"""Handler of commands sent by CloudSync app to the Diality Cloud System""" from typing import List, Tuple import json import requests import urllib.parse from cloudsync.utils.globals import * from cloudsync.common.enums import * from cloudsync.utils.helpers import * from cloudsync.handlers.error_handler import ErrorHandler from cloudsync.handlers.error import Error @log_func def cmd_outgoing_get_new_token_with_cert(path_certificate: str, path_private_key: str, save: bool, url: str, client_secret: str, error_handler: ErrorHandler) -> str: """ Obtains authentication token with device certificate & private key :param path_certificate: The path to the certificate :param path_private_key: The path to the private key :param save: If True, save the token :param url: Identity Provider URL used to request a new token :param client_secret: Identity Provider client secret for CS app :param error_handler: global error handler :return: The new token """ try: payload = { "grant_type": "password", "scope": "openid profile", "client_id": "device-client", "client_secret": client_secret, } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } cert_paths = (path_certificate, path_private_key) g_utils.logger.debug("Making request: {0}, {1}, {2}, {3}".format(url, headers, payload, cert_paths)) response = requests.post(url=url, headers=headers, data=payload, cert=cert_paths, timeout=5) data = response.json() g_utils.logger.debug("Keycloak response: {0}".format(data)) if save: if not os.path.exists(TOKEN_CACHING_PATH): os.makedirs(TOKEN_CACHING_PATH) with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'w') as f: f.write(json.dumps(data, indent=4)) return data.get("access_token", None) except requests.exceptions.Timeout: error = Error("{0},2,{1},Obtain token request timeout".format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value)) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value)) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, str(e))) error_handler.enqueue_error(error=error) return None @log_func def cmd_outgoing_verify_token(url: str, access_token: str, error_handler: ErrorHandler) -> requests.Response: try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, "X-OrganizationId": '1', 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } data = { "hdSerialNumber": "token-validation", "dgSerialNumber": "token-validation", "softwareVersion": "token-validation" } resp = requests.post(url=url, data=data, headers=headers) return resp except requests.exceptions.Timeout: error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VERIFY_TOKEN_ERROR.value)) error_handler.enqueue_error(error=error) except requests.exceptions.TooManyRedirects: error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VERIFY_TOKEN_ERROR.value)) error_handler.enqueue_error(error=error) except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VERIFY_TOKEN_ERROR.value, str(e))) error_handler.enqueue_error(error=error) @log_func def cmd_outgoing_validate_device(access_token: str, hd_serial_number: str, dg_serial_number: str, sw_version: str, url: str, error_handler: ErrorHandler) -> dict: """ Step 8. Validate device Step 9. Validate device response (list of invalid fields) :return: The json response """ try: payload = json.dumps({ "hdSerialNumber": hd_serial_number, "dgSerialNumber": dg_serial_number, "softwareVersion": sw_version }) headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } response = requests.post(url=url, headers=headers, data=payload) except requests.exceptions.Timeout: error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value)) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value)) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, str(e))) error_handler.enqueue_error(error=error) try: return response.json() except json.decoder.JSONDecodeError as e: error = Error("{0},3,{1},Could not validate device. Received: {2}:{3},Exception: {4}".format( OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, response.status_code, response.reason, str(e))) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, str(e))) error_handler.enqueue_error(error=error) # Runtime commands @log_func def cmd_outgoing_set_device_state(url: str, access_token: str, device_state_json: dict, error_handler: ErrorHandler) -> requests.Response: """ Updates the backend with the current device state :param url: set device state URL :param device_state_json: device state payload :param access_token: access token :param error_handler: global error handler :return: The response """ try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } payload = device_state_json resp = requests.put(url=url, headers=headers, data=payload) return resp except requests.exceptions.Timeout: error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value)) error_handler.enqueue_error(error=error) except requests.exceptions.TooManyRedirects: error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value)) error_handler.enqueue_error(error=error) except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, str(e))) error_handler.enqueue_error(error=error) @log_func def cmd_outgoing_check_if_patient_with_emr_id_exists(access_token: str, url: str, error_handler: ErrorHandler) -> requests.Response: try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } response = requests.get(url=url, headers=headers) except requests.exceptions.Timeout: error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value)) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value)) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, str(e))) error_handler.enqueue_error(error=error) try: return response except json.decoder.JSONDecodeError as e: error = Error("{0},3,{1},Could not check if the patient exists. Received: {2}:{3},Exception: {4}".format( OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, response.status_code, response.reason, str(e))) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, str(e))) error_handler.enqueue_error(error=error) @log_func def cmd_outgoing_create_temporary_patient(access_token: str, url: str, error_handler: ErrorHandler): try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } payload = {} response = requests.post(url=url, headers=headers, data=payload) except requests.exceptions.Timeout: error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value)) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value)) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, str(e))) error_handler.enqueue_error(error=error) try: return response.json() except json.decoder.JSONDecodeError as e: error = Error("{0},3,{1},Could not create temporary patient. Received: {2}:{3},Exception: {4}".format( OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, response.status_code, response.reason, str(e))) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, str(e))) error_handler.enqueue_error(error=error) @log_func def cmd_outgoing_set_patient_device_association(url: str, access_token: str, associate: bool, error_handler: ErrorHandler) -> requests.Response: """ Sets the status of the device & patient association :param url: set device state URL :param access_token: access token :param associate: status of the device & patient association :param error_handler: global error handler :return: The response """ try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } payload = {} if associate: resp = requests.head(url=urllib.parse.urljoin(url, "/exists"), headers=headers, data=payload) if resp.status_code == NOT_FOUND: resp = requests.put(url=url, headers=headers, data=payload) else: resp = requests.delete(url=url, headers=headers, data=payload) return resp except requests.exceptions.Timeout: error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value)) error_handler.enqueue_error(error=error) except requests.exceptions.TooManyRedirects: error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value)) error_handler.enqueue_error(error=error) except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value, str(e))) error_handler.enqueue_error(error=error) @log_func def cmd_outgoing_send_treatment_report(url: str, access_token: str, treatment_log: str, error_handler: ErrorHandler) -> requests.Response: """ Sends a treatment report to DCS :param url: set device state URL :param access_token: access token :param treatment_log: treatment report sent to DCS :param error_handler: global error handler :return: The response """ try: headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } payload = treatment_log resp = requests.post(url=url, headers=headers, data=payload) return resp except requests.exceptions.Timeout: error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value)) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value)) error_handler.enqueue_error(error=error) return None except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, str(e))) error_handler.enqueue_error(error=error) return None @log_func def cmd_outgoing_upload_file_in_chunks( base_url: str, access_token: str, file_json: dict, error_handler: ErrorHandler, log_file_origin: str, chunk_size: int=2 * 1024 * 1024, retries: int=3 ) -> Union[str, None]: """ Uploads a large file in chunks using sessions and retries. Args: base_url (str): The base URL of the API. access_token (str): The access token used for Authorization. file_json (dict): The populated/constructed `log_upload_template.json` file for a specific file. error_handler (ErrorHandler): Current `ErrorHandler` instance. log_file_origin (str): The origin of the log file to be uploaded. Accepted values `device` or `cs` chunk_size (int, optional): The size of each chunk in bytes. Defaults to 50MB. retries (int, optional): The number of times to retry failed uploads. Defaults to 3. Returns: str | None: The uploaded file name if succeeded, None otherwise. """ origins = ("cs", "device") if log_file_origin not in origins: g_utils.logger.error(f"Wrong log file origin provided.") return None ERROR_ID = ErrorIDs.CS_DEVICE_LOG_ERROR.value if log_file_origin == 'device' else ErrorIDs.CS_LOG_ERROR.value # # Start upload session # start_session_url = os.path.join(base_url, "api/device/data/start-session") start_session_payload = file_json['start_session'] start_session_payload = json.dumps(start_session_payload) headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } g_utils.logger.debug(f"File upload payload (start-session): {start_session_payload}") try: response = requests.post( url=start_session_url, headers=headers, data=start_session_payload) if response.status_code != 200: raise Exception(f"Error while starting upload session: {response.status_code} - {response.text}") except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,str(e))) error_handler.enqueue_error(error=error) return None session_id = response.json().get("sessionId") if not session_id: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,"Missing session ID in response.")) error_handler.enqueue_error(error=error) return None # # Send file in chunks # try: target_file = file_json['general']["file_path"] file_size = file_json['general']["file_size"] upload_chunk_url = os.path.join(base_url, "api/device/data/chunk") upload_chunk_payload = file_json['upload_chunk'] upload_chunk_payload['sessionId'] = session_id upload_chunk_payload['chunkType'] = "device-data" chunk_number = 1 headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } with open(target_file, "rb") as f: file_content = f.read() # Encode the bytes using base64 base64_string = base64.b64encode(file_content).decode("utf8") # Get the total size of the base64 string (in bytes) total_size = len(base64_string) # Calculate the number of chunks num_chunks = total_size // chunk_size + (total_size % chunk_size > 0) for i in range(num_chunks): start_index = i * chunk_size end_index = min(start_index + chunk_size, total_size) chunk = base64_string[start_index:end_index] # Retry logic with counter and backoff time retry_count = 0 while retry_count < retries: try: if type(upload_chunk_payload) is str: upload_chunk_payload = json.loads(upload_chunk_payload) upload_chunk_payload['chunkNo'] = chunk_number upload_chunk_payload['data'] = chunk upload_chunk_payload = json.dumps(upload_chunk_payload) g_utils.logger.debug(f"File upload payload (upload-chunk) - chunk No {chunk_number}: {upload_chunk_payload}") response = requests.post(upload_chunk_url, headers=headers, data=upload_chunk_payload) if response.status_code == 200: chunk_number += 1 g_utils.logger.info(f"Uploaded chunk {chunk_number} of {num_chunks}") break # Successful upload, break retry loop if retry_count < retries: g_utils.logger.info(f"Retrying chunk upload in 5 seconds...") sleep(5) # Implement backoff time between retries retry_count += 1 except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,str(e))) error_handler.enqueue_error(error=error) except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,str(e))) error_handler.enqueue_error(error=error) return None # # End upload session # end_session_url = os.path.join(base_url, "api/device/data/end-session") end_session_payload = file_json['end_session'] end_session_payload['sessionId'] = session_id end_session_payload['completedAt'] = int(datetime.now(timezone.utc).timestamp()*1000) headers = { 'Authorization': BEARER_HOLDER.format(access_token), 'Content-Type': CONTENT_TYPE, 'User-Agent': USER_AGENT, "X-Api-Version": API_VERSION } try: end_session_payload = json.dumps(end_session_payload) g_utils.logger.debug(f"Device log upload payload (end-session): {end_session_payload}") response = requests.post(end_session_url, headers=headers, data=end_session_payload) if response.status_code != 200: raise Exception(f"Error while ending upload session: {response.status_code} - {response.text}") except Exception as e: error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,str(e))) error_handler.enqueue_error(error=error) return None g_utils.logger.info(f"File {file_json['start_session']['metadata']['deviceFileName']} uploaded.") return str(file_json['start_session']['metadata']['deviceFileName'])