Index: cloud_sync.py =================================================================== diff -u -rd17748c773b5a38a9ef0e5ffa59406f5c2e023df -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloud_sync.py (.../cloud_sync.py) (revision d17748c773b5a38a9ef0e5ffa59406f5c2e023df) +++ cloud_sync.py (.../cloud_sync.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -15,11 +15,16 @@ from cloudsync.utils.globals import * from cloudsync.utils.helpers import * from cloudsync.utils.logging import LoggingConfig +from cloudsync.utils.watchdog import Watchdog, make_restart_fn + +import hmac import os +import signal import sys +import threading -VERSION = "0.5.0" +VERSION = "0.5.2" arguments = sys.argv @@ -30,11 +35,22 @@ logconf = LoggingConfig() logconf.initiate(app=app) +# API key authentication for deployed instances +CS_API_KEY = os.environ.get('CS_API_KEY') + +@app.before_request +def check_api_key(): + if CS_API_KEY is None: + return # no key configured — allow all requests + key = request.headers.get('X-Api-Key', '') + if not hmac.compare_digest(key, CS_API_KEY): + return {"message": "Unauthorized: invalid or missing API key"}, 401 + sleep(5) # wait for UI to prepare the configurations partition try: ok = False - print(SETUP_CONSOLE_LINE) + g_utils.logger.info(SETUP_CONSOLE_LINE) if EXEC_MODE_UPGRADE_KEY in arguments: # ---------- upgrade # Read from $HOME @@ -45,15 +61,15 @@ else: if EXEC_MODE_UPDATE_KEY in arguments: # ---------- update EXEC_MODE = EXEC_MODE_UPDATE - print(f"CloudSync starting in {EXEC_MODE} mode") - print("CloudSync update config started...") + g_utils.logger.info("CloudSync starting in %s mode", EXEC_MODE) + g_utils.logger.info("CloudSync update config started...") # Update the $HOME from /var/configuraitons/ and update result in both for later update oldConfig = helpers_read_config(OPERATION_CONFIG_FILE_PATH) newConfig = helpers_read_config(CONFIG_PATH) newConfig.update(oldConfig) helpers_write_config(None , CONFIG_PATH , newConfig) helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, newConfig) - print("CloudSync update config done.") + g_utils.logger.info("CloudSync update config done.") # Read from /var/configuraitons/ # ---------- normal if os.path.isfile(OPERATION_CONFIG_FILE_PATH) and os.access(OPERATION_CONFIG_FILE_PATH, os.R_OK): #TODO test if this check needed? @@ -62,16 +78,16 @@ ok = True if ok: - print(f"CloudSync started in {EXEC_MODE} mode") - print(f"Using config: {CONFIG_PATH}") + g_utils.logger.info("CloudSync started in %s mode", EXEC_MODE) + g_utils.logger.info("Using config: %s", CONFIG_PATH) else: g_utils.logger.error(f"Error reading config file in {EXEC_MODE}") - print(SETUP_CONSOLE_LINE) + g_utils.logger.info(SETUP_CONSOLE_LINE) except Exception as e: g_utils.logger.error(f"Error reading config file - {e}") - print(SETUP_CONSOLE_LINE) + g_utils.logger.info(SETUP_CONSOLE_LINE) sys.exit(0) try: @@ -116,11 +132,52 @@ g_utils.logger.debug("Current config path: {0}".format(CONFIG_PATH)) parser = reqparse.RequestParser() + + # Watchdog — monitors critical daemon threads (operation mode only). + # During registration the device is semi-managed by the manufacturing + # tool, so autonomous recovery is unnecessary and could interfere with + # the registration-to-operation transition. + watchdog = Watchdog(logger=app.logger) + if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'operation': + watchdog.register("reachability", lambda: reachability_provider.thread, + make_restart_fn(reachability_provider, "thread", reachability_provider.reachability_test)) + watchdog.register("output_bus", lambda: output_channel.thread, + make_restart_fn(output_channel, "thread", output_channel.scheduler)) + watchdog.register("error_handler", lambda: error_handler.thread, + make_restart_fn(error_handler, "thread", error_handler.scheduler)) + watchdog.register("network_request_handler", lambda: network_request_handler.thread, + make_restart_fn(network_request_handler, "thread", network_request_handler.scheduler)) + watchdog.register("message_handler", lambda: message_handler.thread, + make_restart_fn(message_handler, "thread", message_handler.scheduler)) + watchdog.register("file_input_bus", lambda: ui_cs_bus.thread, + make_restart_fn(ui_cs_bus, "thread", ui_cs_bus.input_channel_handler)) + watchdog.register("heartbeat", lambda: heartbeat_provider.thread, + make_restart_fn(heartbeat_provider, "thread", heartbeat_provider.heartbeat)) + watchdog.start() + g_utils.logger.info("Watchdog started (operation mode)") + else: + g_utils.logger.info("Watchdog skipped (registration mode — device is managed)") + except Exception as e: g_utils.logger.error("Failed to start CS - {0}".format(e)) sys.exit(0) +# Signal handlers for graceful shutdown +shutdown_event = threading.Event() +def _graceful_shutdown(signum, _frame): + g_utils.logger.info("Received signal %d, initiating graceful shutdown...", signum) + shutdown_event.set() + try: + watchdog.stop() + except Exception: + pass + sys.exit(0) + +signal.signal(signal.SIGTERM, _graceful_shutdown) +signal.signal(signal.SIGINT, _graceful_shutdown) + + @api.route("/version") class Version(Resource): @@ -245,10 +302,10 @@ # END - REGISTRATION ENDPOINTS - def main(): if g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] == 'registration': - app.run(debug=False, use_reloader=False, host="0.0.0.0",port=g_config[CONFIG_DEVICE][CONFIG_DEVICE_PORT]) + app.run(debug=False, use_reloader=False, host="0.0.0.0", + port=g_config[CONFIG_DEVICE][CONFIG_DEVICE_PORT]) else: while True: sleep(1) Index: cloudsync/busses/file_input_bus.py =================================================================== diff -u -rae065dc96f33fc1946785ee833356dae959be2d9 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision ae065dc96f33fc1946785ee833356dae959be2d9) +++ cloudsync/busses/file_input_bus.py (.../file_input_bus.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -7,6 +7,7 @@ from cloudsync.handlers.ui_cs_request_handler import UICSMessageHandler from cloudsync.handlers.uics_message import UICSMessage +from cloudsync.utils.filesystem import check_readable class FileInputBus: @@ -36,7 +37,6 @@ self.message_handler = message_handler self.i = inotify.adapters.Inotify() - # print(self.file_channels_path) self.i.add_watch(self.file_channels_path) self.thread = Thread(target=self.input_channel_handler, daemon=True) @@ -53,21 +53,31 @@ if (('IN_MODIFY' in type_names) or ('IN_CLOSE_WRITE' in type_names)) and ( filename.endswith(self.input_channel_name)): + input_file_path = self.file_channels_path + "/" + filename + if not check_readable(input_file_path): + self.logger.warning('Input file not readable: {0}'.format(input_file_path)) + continue + try: - f = open(self.file_channels_path + "/" + filename) + f = open(input_file_path) except IOError as er: self.logger.error('Opening input file error: {0}'.format(' '.join(str(er)))) + continue new_input_messages = [] for line in f.readlines(): message_parameters = line.strip().split(',') - # print("message parameters: {0}".format(message_parameters)) - if int(message_parameters[1]) > self.last_input_message_id: + try: + sequence_id = int(message_parameters[1]) + except (ValueError, IndexError): + self.logger.warning('Skipping message with invalid sequence: {0}'.format(line.strip())) + continue + if sequence_id > self.last_input_message_id: new_message = UICSMessage(line.strip(), self.g_config) message_added_to_queue = self.message_handler.enqueue_message(new_message) if message_added_to_queue: - new_input_messages.append((int(message_parameters[1]), line.strip())) + new_input_messages.append((sequence_id, line.strip())) new_input_messages.sort(key=lambda x: x[0]) # self.logger.debug("New Input messages added to queue: {0}".format(new_input_messages)) Index: cloudsync/busses/file_output_bus.py =================================================================== diff -u -r3736c29ba097740b4e99684625e78bfaaac6b1a1 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision 3736c29ba097740b4e99684625e78bfaaac6b1a1) +++ cloudsync/busses/file_output_bus.py (.../file_output_bus.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -6,6 +6,7 @@ import datetime from cloudsync.utils import helpers +from cloudsync.utils.filesystem import check_writable, check_disk_space_mb class FileOutputBus: @@ -41,7 +42,6 @@ flag = self.event.wait() if flag: while len(self.queue) > 0: - # print('queue size: {0}'.format(len(self.queue))) message_body = self.queue.popleft() self.handle_message(message_body) self.event.clear() @@ -65,19 +65,23 @@ Parses queue messages and send them downstream :param str message_body: the message body """ - # print('Message body: {0}'.format(message_body)) self.logger.debug('Message body: {0}'.format(message_body)) try: - filename = datetime.datetime.utcnow().date().strftime("%Y_%m_%d_out.buf") - message_data = str(round(datetime.datetime.now().timestamp())) + str(self.last_output_message_id) + message_body + if not check_writable(self.file_channels_path): + raise IOError("Output channel path is not writable: {}".format(self.file_channels_path)) + if not check_disk_space_mb(self.file_channels_path, required_mb=1): + raise IOError("Insufficient disk space at: {}".format(self.file_channels_path)) + + now = datetime.datetime.now(datetime.timezone.utc) + filename = now.date().strftime("%Y_%m_%d_out.buf") + timestamp_str = str(round(now.timestamp())) + message_data = timestamp_str + str(self.last_output_message_id) + message_body message_crc8 = helpers.helpers_crc8(message_data.encode('utf-8')) - message_body = str(round(datetime.datetime.now().timestamp())) + ',' + str(self.last_output_message_id) + ',' + str(message_crc8) + ',' + message_body + message_body = timestamp_str + ',' + str(self.last_output_message_id) + ',' + str(message_crc8) + ',' + message_body self.logger.info('CS2UI Message: {0}'.format(message_body)) - # print('Full message: {0}'.format(message_body)) - f = open(self.file_channels_path + "/" + filename, "a") - f.write("{0}\n".format(message_body)) - f.close() + with open(self.file_channels_path + "/" + filename, "a") as f: + f.write("{0}\n".format(message_body)) self.last_output_message_id += 1 except IOError as er: self.logger.error('Opening and/or writing to output file error: {0}'.format(' '.join(str(er)))) Fisheye: Tag 99d35f3d5898d50372e1745b96b393db40cd8394 refers to a dead (removed) revision in file `cloudsync/config/config_STAGING.json'. Fisheye: No comparison available. Pass `N' to diff? Index: cloudsync/config/config_integration.json =================================================================== diff -u -r19f09f19af5ac851ab78be77c33d09af26a36f84 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/config/config_integration.json (.../config_integration.json) (revision 19f09f19af5ac851ab78be77c33d09af26a36f84) +++ cloudsync/config/config_integration.json (.../config_integration.json) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -1,29 +1,29 @@ { "kebormed_paas": { - "idp_client_secret": "ocnnWhkfImztoJShV6GV4uxeAOM0GhwT", - "url_mft": "", - "url_dcs": "https://device-api.diality.integration.kebormed.com", - "url_device_identity": "https://device-identity.diality.integration.kebormed.com/auth/realms/Main/protocol/openid-connect/token", - "url_reachability": "https://healthcheck.diality.integration.kebormed.com/", + "idp_client_secret": "mock-client-secret", + "url_mft": "http://mock-drt:9090", + "url_dcs": "http://mock-dcs:8080", + "url_device_identity": "http://mock-dcs:8080/auth/realms/Main/protocol/openid-connect/token", + "url_reachability": "http://mock-dcs:8080/health", "dia_org_id": 1 }, "device": { - "ip": "", - "port": 80, - "name": "", - "hd_serial": "", - "dg_serial": "", - "sw_version": "", + "ip": "172.18.0.3", + "port": 5000, + "name": "HD_TEST_1770675061356", + "hd_serial": "HD_TEST_1770675061356", + "dg_serial": "DG_TEST_1770675061356", + "sw_version": "0.5.0_test", "mode": "registration", - "device_state": "INACTIVE_NOT_OK" + "device_state": 4 }, "logs": { - "default_log_level": "ERROR", + "default_log_level": "DEBUG", "default_log_level_duration": "86400000", - "current_log_level": "", + "current_log_level": "DEBUG", "log_level_duration": 0, "log_level_start_timestamp": 0, "log_level_stop_timestamp": 0, "update_dcs_flag": 0 } -} +} \ No newline at end of file Index: cloudsync/handlers/cs_mft_dcs_request_handler.py =================================================================== diff -u -re3afee7b49f898e127dbe0330d0e9a63089f84d3 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision e3afee7b49f898e127dbe0330d0e9a63089f84d3) +++ cloudsync/handlers/cs_mft_dcs_request_handler.py (.../cs_mft_dcs_request_handler.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -18,9 +18,6 @@ from cloudsync.handlers.outgoing.handler_cs_to_dcs import * from cloudsync.handlers.incoming.handler_mft_to_cs import * -ERROR_STRING = "{0},2,{1},{2}" - - class NetworkRequestHandler: def __init__(self, logger: Logger, max_size, output_channel, reachability_provider, error_handler): self.logger = logger @@ -41,7 +38,6 @@ """ while True: flag = self.event.wait() - # print('Flag: {0}'.format(flag)) if flag: req = self.queue.popleft() self.handle_request(req) @@ -54,9 +50,7 @@ :return: True upon success, False otherwise """ if len(self.queue) < self.queue.maxlen: - # print('network queue size before: {0}'.format(len(self.queue))) self.queue.append(req) - # print('network queue size after: {0}'.format(len(self.queue))) self.event.set() return True else: @@ -83,9 +77,9 @@ error_handler=self.error_handler) self.logger.debug("DRT Request registration resp: {0}".format(response)) except Exception as e: - error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_SET_CREDENTIALS: try: @@ -98,9 +92,9 @@ output_channel=self.output_channel, error_handler=self.error_handler) except Exception as e: - error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_INIT_CONNECTIVITY_TEST: try: @@ -116,9 +110,9 @@ error_handler=self.error_handler ) except Exception as e: - error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.MFT2CS_REQ_FACTORY_RESET: try: @@ -127,7 +121,9 @@ helpers_write_config(OPERATION_CONFIG_PATH, OPERATION_CONFIG_FILE_PATH, req.g_config) cmd_incoming_factory_reset(output_channel=self.output_channel) except Exception as e: - error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, ErrorIDs.CS_FACTORY_RESET_ERROR.value, e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_FACTORY_RESET_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) req.g_config[CONFIG_DEVICE][CONFIG_DEVICE_MODE] = 'registration' helpers_write_config(None, CONFIG_PATH, req.g_config) @@ -164,6 +160,9 @@ device_state_json=device_state_json, error_handler=self.error_handler) + if response is None: + return + if response.status_code == 200: self.logger.debug("CS set Device state: {0}".format(response.json())) @@ -186,18 +185,19 @@ self.logger.debug("DCS Request device state response code: {0} & full response: {1}".format(response.status_code, response.text)) if response.status_code == UNASSIGNED: - error = Error("{0},2,{1}, Invalid device state transition".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + "Invalid device state transition") self.error_handler.enqueue_error(error=error) else: - error = Error( - "{0},2,{1}, Missing access token".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + "Missing access token") self.error_handler.enqueue_error(error=error) except Exception as e: - error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_TREATMENT_REPORT: try: @@ -216,6 +216,13 @@ token_verification_url=token_verification_url, client_secret=client_secret) + if access_token is None: + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + "Missing access token") + self.error_handler.enqueue_error(error=error) + return + # Step #1 - get organization id for current device response = cmd_outgoing_validate_device(access_token=access_token, @@ -227,6 +234,9 @@ url=validate_url, error_handler=self.error_handler) + if response is None: + return + invalid_attributes = response.get("invalidAttributes", None) if len(invalid_attributes) > 0: @@ -259,12 +269,17 @@ # Step #2b - If patient with emr_id doesn't exist, create temporary patient + if response is None: + return + if response.status_code == OK: patient_id = response.json().get("id", None) elif response.status_code == NOT_FOUND: response = cmd_outgoing_create_temporary_patient(access_token=access_token, url=create_temporary_patient_url, error_handler=self.error_handler) + if response is None: + return patient_id = response.get("id", None) else: g_utils.logger.warning("Patient didn't exist and a temporary patient couldn't be created") @@ -301,15 +316,21 @@ self.logger.debug("treatment log: {0}".format(treatment_log_json)) + self.logger.info("Sending treatment report to DCS") response = cmd_outgoing_send_treatment_report(url=data_submission_url, access_token=access_token, treatment_log=treatment_log_json, error_handler=self.error_handler) - if response is not None: - self.logger.debug( - "Treatment upload response: {0}".format(response.json())) + if response is None: + return + self.logger.info(f"Treatment upload response: {response.status_code}") + if response.status_code != OK: + self.logger.error(f"Treatment upload failed: {response.status_code} - {response.text[:500]}") + self.logger.debug( + "Treatment upload response body: {0}".format(response.json())) + if response.status_code == OK: # Send TX code to UI app message_body = str( @@ -329,9 +350,9 @@ error_handler=self.error_handler) except Exception as e: - error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_DEVICE_LOG: try: @@ -363,6 +384,9 @@ url=validate_url, error_handler=self.error_handler) + if response is None: + return + invalid_attributes = response.get("invalidAttributes", None) if len(invalid_attributes) > 0: @@ -386,7 +410,11 @@ access_token=access_token, file_json=device_log_json, error_handler=self.error_handler, - log_file_origin='device') + log_file_origin='device', + token_refresher=lambda: self.get_valid_token( + identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret)) if isinstance(upload_result, dict) and not upload_result.get("accepted"): # Rejection (e.g. 409 duplicate) — send 2010 with 3 params: filename, 0, reason_code @@ -403,15 +431,15 @@ upload_result) self.output_channel.enqueue_message(message_body) else: - error = Error( - "{0},2,{1}, Missing access token".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_LOG_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + "Missing access token") self.error_handler.enqueue_error(error=error) except Exception as e: - error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_LOG_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) elif req.request_type == NetworkRequestType.CS2DCS_REQ_SEND_CS_LOG: try: @@ -443,6 +471,9 @@ url=validate_url, error_handler=self.error_handler) + if response is None: + return + invalid_attributes = response.get("invalidAttributes", None) if len(invalid_attributes) > 0: @@ -458,7 +489,6 @@ organization_id = response.get("associatedOrganizationId", None) cs_log_data['organizationId'] = organization_id - # Step #3 - upload the cs log file cs_log_json = helpers_construct_cs_log_json(cs_log_data) @@ -467,21 +497,25 @@ access_token=access_token, file_json=cs_log_json, error_handler=self.error_handler, - log_file_origin='cs') + log_file_origin='cs', + token_refresher=lambda: self.get_valid_token( + identity_url=identity_url, + token_verification_url=token_verification_url, + client_secret=client_secret)) if cs_log_filename is not None: self.logger.debug("CS log file uploaded: {cs_log_filename}") else: - error = Error( - "{0},2,{1}, Missing access token".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_LOG_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_ERROR.value, + "Missing access token") self.error_handler.enqueue_error(error=error) except Exception as e: - error = Error(ERROR_STRING.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_LOG_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) else: g_utils.logger.warning("Request type {0} not supported".format(req.request_type)) @@ -490,6 +524,7 @@ access_token = helpers_get_stored_token() if access_token is None: + self.logger.info("No stored token found, requesting new token") access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, @@ -500,13 +535,16 @@ response = cmd_outgoing_verify_token(url=token_verification_url, access_token=access_token, error_handler=self.error_handler) - if response.status_code == UNAUTHORIZED: + if response is None or response.status_code != OK: + self.logger.warning(f"Token verification failed (status={response.status_code if response else 'None'}), refreshing token") access_token = cmd_outgoing_get_new_token_with_cert(path_certificate=CREDENTIALS_CERTIFICATE_X509, path_private_key=CREDENTIALS_PRIVATE_KEY, save=True, url=identity_url, client_secret=client_secret, error_handler=self.error_handler) + else: + self.logger.info("Token verification succeeded") return access_token Index: cloudsync/handlers/error.py =================================================================== diff -u -r4ebf654ab1b0d436ce7d171bfe06c6bdc46aca47 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/handlers/error.py (.../error.py) (revision 4ebf654ab1b0d436ce7d171bfe06c6bdc46aca47) +++ cloudsync/handlers/error.py (.../error.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -31,3 +31,29 @@ "parameters": self.parameters } return str(data) + + @classmethod + def timeout(cls, message_id, error_id, detail="Registration timeout"): + return cls("{0},2,{1},{2}".format(message_id, error_id, detail)) + + @classmethod + def redirect(cls, message_id, error_id): + return cls("{0},2,{1},Too many redirects".format(message_id, error_id)) + + @classmethod + def general(cls, message_id, error_id, detail): + return cls("{0},2,{1},{2}".format(message_id, error_id, detail)) + + @classmethod + def validation(cls, message_id, error_id, status_code, reason, detail): + return cls("{0},3,{1},{2}:{3},Exception: {4}".format( + message_id, error_id, status_code, reason, detail)) + + @classmethod + def file_not_found(cls, message_id, error_id): + return cls("{0},2,{1},UI logfile not found".format(message_id, error_id)) + + @classmethod + def crc_mismatch(cls, message_id, error_id, sequence, expected_crc, actual_crc): + return cls("{0},4,{1},Bad CRC on message {2},Message CRC: {3},Calculated CRC: {4}".format( + message_id, error_id, sequence, expected_crc, actual_crc)) Index: cloudsync/handlers/error_handler.py =================================================================== diff -u -rcc36b7572e8d4e67ea514e038f9e07665d6e336f -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/handlers/error_handler.py (.../error_handler.py) (revision cc36b7572e8d4e67ea514e038f9e07665d6e336f) +++ cloudsync/handlers/error_handler.py (.../error_handler.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -29,7 +29,6 @@ flag = self.event.wait() if flag: while len(self.queue) > 0: - # print('queue size: {0}'.format(len(self.queue))) error = self.queue.popleft() self.handle_error(error) self.event.clear() Index: cloudsync/handlers/incoming/handler_mft_to_cs.py =================================================================== diff -u -rcc7bbd932684e69aa456c114596625546630903e -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/handlers/incoming/handler_mft_to_cs.py (.../handler_mft_to_cs.py) (revision cc7bbd932684e69aa456c114596625546630903e) +++ cloudsync/handlers/incoming/handler_mft_to_cs.py (.../handler_mft_to_cs.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -5,6 +5,7 @@ from cloudsync.handlers.outgoing import * from cloudsync.utils.helpers import * from cloudsync.utils.globals import * +from cloudsync.utils.filesystem import check_writable, check_disk_space_mb from cloudsync.common.enums import * from cloudsync.handlers.outgoing.handler_cs_to_mft import * from cloudsync.handlers.outgoing.handler_cs_to_dcs import * @@ -29,25 +30,28 @@ if not os.path.exists(CREDENTIALS_PATH): os.makedirs(CREDENTIALS_PATH) - f = open(CREDENTIALS_CERTIFICATE_X509, 'w') - f.write(certificate) - f.close() + if not check_writable(CREDENTIALS_PATH): + raise IOError("Credentials path is not writable: {}".format(CREDENTIALS_PATH)) + if not check_disk_space_mb(CREDENTIALS_PATH, required_mb=1): + raise IOError("Insufficient disk space at: {}".format(CREDENTIALS_PATH)) - f = open(CREDENTIALS_PRIVATE_KEY, 'w') - f.write(private_key) - f.close() + with open(CREDENTIALS_CERTIFICATE_X509, 'w') as f: + f.write(certificate) - f = open(CREDENTIALS_PUBLIC_KEY, 'w') - f.write(public_key) - f.close() + with open(CREDENTIALS_PRIVATE_KEY, 'w') as f: + f.write(private_key) + with open(CREDENTIALS_PUBLIC_KEY, 'w') as f: + f.write(public_key) + message_body = str( OutboundMessageIDs.CS2UI_REQ_SAVE_CREDENTIALS.value) + ',3,' + CREDENTIALS_CERTIFICATE_X509 + ',' + CREDENTIALS_PRIVATE_KEY + ',' + CREDENTIALS_PUBLIC_KEY output_channel.enqueue_message(message_body) except IOError as er: - error = Error("{0},2,{1},Error writing device credentials".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CREDENTIALS_ERROR.value, + "Error writing device credentials") error_handler.enqueue_error(error=error) g_utils.logger.error('Error writing device credentials: {0}'.format(' '.join(er.args))) @@ -99,27 +103,25 @@ json_resp = cmd_outgoing_validation_result(invalid_attributes, hd_serial, g_config, error_handler) g_utils.logger.debug("Validation result request response (DRT --> CS): {0}".format(json_resp)) else: - error = Error( - "{0},2,{1},Validation failed due to invalid DCS response format".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + "Validation failed due to invalid DCS response format") error_handler.enqueue_error(error=error) else: - error = Error( - "{0},2,{1},Validation failed due to missing response from DCS".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + "Validation failed due to missing response from DCS") error_handler.enqueue_error(error=error) else: - error = Error( - "{0},2,{1},Validation failed due to missing token".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + "Validation failed due to missing token") error_handler.enqueue_error(error=error) except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) Index: cloudsync/handlers/logs_handler.py =================================================================== diff -u -r34a18ebe349364c4e0462d876d4d191bf3df8939 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision 34a18ebe349364c4e0462d876d4d191bf3df8939) +++ cloudsync/handlers/logs_handler.py (.../logs_handler.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -68,7 +68,7 @@ g_config=cls.g_config, success_message='CS2DCS_REQ_SEND_CS_LOG request added to network queue') except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_LOG_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_ERROR.value, + str(e)) cls.error_handler.enqueue_error(error=error) \ No newline at end of file Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u -re3afee7b49f898e127dbe0330d0e9a63089f84d3 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision e3afee7b49f898e127dbe0330d0e9a63089f84d3) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -65,19 +65,20 @@ return data.get("access_token", None) except requests.exceptions.Timeout: - error = Error("{0},2,{1},Obtain token request timeout".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + "Obtain token request timeout") error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: - error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_GET_NEW_TOKEN_WITH_CERT_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) return None @@ -102,22 +103,27 @@ } resp = requests.post(url=url, - data=data, - headers=headers) + data=json.dumps(data), + headers=headers, + timeout=(30, 60)) + g_utils.logger.info(f"Token verification response: {resp.status_code}") return resp except requests.exceptions.Timeout: - error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VERIFY_TOKEN_ERROR.value) error_handler.enqueue_error(error=error) + return None except requests.exceptions.TooManyRedirects: - error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VERIFY_TOKEN_ERROR.value) error_handler.enqueue_error(error=error) + return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VERIFY_TOKEN_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VERIFY_TOKEN_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None @log_func @@ -148,39 +154,39 @@ response = requests.post(url=url, headers=headers, - data=payload) + data=payload, + timeout=(30, 60)) except requests.exceptions.Timeout: - error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: - error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None try: return response.json() except json.decoder.JSONDecodeError as e: - error = Error("{0},3,{1},Could not validate device. Received: {2}:{3},Exception: {4}".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - response.status_code, - response.reason, - str(e))) + error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + response.status_code, response.reason, str(e)) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_VALIDATE_DEVICE_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None # Runtime commands @@ -208,21 +214,25 @@ payload = device_state_json resp = requests.put(url=url, headers=headers, - data=payload) + data=payload, + timeout=(30, 60)) return resp except requests.exceptions.Timeout: - error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) error_handler.enqueue_error(error=error) + return None except requests.exceptions.TooManyRedirects: - error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value) error_handler.enqueue_error(error=error) + return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None @log_func @@ -238,39 +248,39 @@ } response = requests.get(url=url, - headers=headers) + headers=headers, + timeout=(30, 60)) except requests.exceptions.Timeout: - error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: - error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None try: return response except json.decoder.JSONDecodeError as e: - error = Error("{0},3,{1},Could not check if the patient exists. Received: {2}:{3},Exception: {4}".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - response.status_code, - response.reason, - str(e))) + error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, + response.status_code, response.reason, str(e)) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CHECK_IF_PATIENT_WITH_EMR_ID_EXISTS_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None @log_func @@ -289,39 +299,39 @@ response = requests.post(url=url, headers=headers, - data=payload) + data=payload, + timeout=(30, 60)) except requests.exceptions.Timeout: - error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: - error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None try: return response.json() except json.decoder.JSONDecodeError as e: - error = Error("{0},3,{1},Could not create temporary patient. Received: {2}:{3},Exception: {4}".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - response.status_code, - response.reason, - str(e))) + error = Error.validation(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, + response.status_code, response.reason, str(e)) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_CREATE_TEMPORARY_PATIENT_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None @log_func @@ -349,30 +359,36 @@ if associate: resp = requests.head(url=urllib.parse.urljoin(url, "/exists"), headers=headers, - data=payload) + data=payload, + timeout=(30, 60)) if resp.status_code == NOT_FOUND: resp = requests.put(url=url, headers=headers, - data=payload) + data=payload, + timeout=(30, 60)) else: resp = requests.delete(url=url, headers=headers, - data=payload) + data=payload, + timeout=(30, 60)) return resp except requests.exceptions.Timeout: - error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) error_handler.enqueue_error(error=error) + return None except requests.exceptions.TooManyRedirects: - error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value) error_handler.enqueue_error(error=error) + return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SET_PATIENT_DEVICE_ASSOCIATION_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) + return None @log_func @@ -400,22 +416,23 @@ resp = requests.post(url=url, headers=headers, - data=payload) + data=payload, + timeout=(30, 60)) return resp except requests.exceptions.Timeout: - error = Error(REGISTRATION_TIMEOUT_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: - error = Error(TOO_MANY_REDIRECTS_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) return None @@ -426,7 +443,8 @@ error_handler: ErrorHandler, log_file_origin: str, chunk_size: int=2 * 1024 * 1024, - retries: int=3 ) -> Union[str, None]: + retries: int=3, + token_refresher: callable=None ) -> Union[str, None]: """ Uploads a large file in chunks using sessions and retries. @@ -454,7 +472,7 @@ # Start upload session # - start_session_url = os.path.join(base_url, "api/device/data/start-session") + start_session_url = base_url.rstrip("/") + "/api/device/data/start-session" start_session_payload = file_json['start_session'] start_session_payload = json.dumps(start_session_payload) headers = { @@ -464,25 +482,29 @@ "X-Api-Version": API_VERSION } + g_utils.logger.info(f"Starting upload session for {log_file_origin} log") g_utils.logger.debug(f"File upload payload (start-session): {start_session_payload}") try: response = requests.post( url=start_session_url, headers=headers, - data=start_session_payload) + data=start_session_payload, + timeout=(30, 60)) + g_utils.logger.info(f"Start-session response: {response.status_code}") if response.status_code != 200: + g_utils.logger.error(f"Start-session failed: {response.status_code} - {response.text[:500]}") raise Exception(f"Error while starting upload session: {response.status_code} - {response.text}") except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) error_handler.enqueue_error(error=error) return None session_id = response.json().get("sessionId") if not session_id: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,"Missing session ID in response.")) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, "Missing session ID in response.") error_handler.enqueue_error(error=error) return None @@ -493,7 +515,7 @@ try: target_file = file_json['general']["file_path"] file_size = file_json['general']["file_size"] - upload_chunk_url = os.path.join(base_url, "api/device/data/chunk") + upload_chunk_url = base_url.rstrip("/") + "/api/device/data/chunk" upload_chunk_payload = file_json['upload_chunk'] upload_chunk_payload['sessionId'] = session_id upload_chunk_payload['chunkType'] = "device-data" @@ -538,32 +560,44 @@ response = requests.post(upload_chunk_url, headers=headers, - data=upload_chunk_payload) + data=upload_chunk_payload, + timeout=(30, 60)) if response.status_code == 200: chunk_number += 1 g_utils.logger.info(f"Uploaded chunk {chunk_number} of {num_chunks}") break # Successful upload, break retry loop + g_utils.logger.warning(f"Chunk {chunk_number}/{num_chunks} upload failed: {response.status_code} - {response.text[:500]}") if retry_count < retries: - g_utils.logger.info(f"Retrying chunk upload in 5 seconds...") + g_utils.logger.info(f"Retrying chunk upload in 5 seconds (attempt {retry_count + 1}/{retries})...") sleep(5) # Implement backoff time between retries retry_count += 1 except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) error_handler.enqueue_error(error=error) except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) error_handler.enqueue_error(error=error) return None # # End upload session # - end_session_url = os.path.join(base_url, "api/device/data/end-session") + # Refresh token before end-session to prevent expiry after long chunk uploads + if token_refresher is not None: + g_utils.logger.info("Refreshing token before end-session phase") + refreshed_token = token_refresher() + if refreshed_token is not None: + access_token = refreshed_token + g_utils.logger.info("Token refreshed successfully before end-session") + else: + g_utils.logger.warning("Token refresh returned None, using existing token for end-session") + + end_session_url = base_url.rstrip("/") + "/api/device/data/end-session" end_session_payload = file_json['end_session'] end_session_payload['sessionId'] = session_id end_session_payload['completedAt'] = int(datetime.now(timezone.utc).timestamp()*1000) @@ -575,21 +609,25 @@ } try: end_session_payload = json.dumps(end_session_payload) + g_utils.logger.info(f"Ending upload session (sessionId={session_id})") g_utils.logger.debug(f"Device log upload payload (end-session): {end_session_payload}") response = requests.post(end_session_url, headers=headers, - data=end_session_payload) + data=end_session_payload, + timeout=(30, 60)) if response.status_code == CONFLICT: device_file_name = str(file_json['start_session']['metadata']['deviceFileName']) g_utils.logger.info(f"File {device_file_name} rejected as duplicate (409 Conflict).") return {"accepted": False, "filename": device_file_name, "reason_code": LogUploadReasonCode.DUPLICATE.value} + g_utils.logger.info(f"End-session response: {response.status_code}") if response.status_code != 200: + g_utils.logger.error(f"End-session failed: {response.status_code} - {response.text[:500]}") raise Exception(f"Error while ending upload session: {response.status_code} - {response.text}") except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value,ERROR_ID,str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, ERROR_ID, str(e)) error_handler.enqueue_error(error=error) return None Index: cloudsync/handlers/outgoing/handler_cs_to_mft.py =================================================================== diff -u -re2b5bba1ace2613e8cd3ca6d997756b1b61d77a4 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/handlers/outgoing/handler_cs_to_mft.py (.../handler_cs_to_mft.py) (revision e2b5bba1ace2613e8cd3ca6d997756b1b61d77a4) +++ cloudsync/handlers/outgoing/handler_cs_to_mft.py (.../handler_cs_to_mft.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -48,22 +48,23 @@ } resp = requests.post(url=url, data=data, - headers=headers) + headers=headers, + timeout=(30, 60)) return resp except requests.exceptions.Timeout: - error = Error("{0},2,{1},Registration timeout".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_REQ_REGISTRATION_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: - error = Error("{0},2,{1},Too many redirects".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_REQ_REGISTRATION_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) return None @@ -84,8 +85,9 @@ if (CONFIG_KEBORMED not in g_config.keys() or CONFIG_KEBORMED_MFT_URL not in g_config[CONFIG_KEBORMED]): - error = Error("{0},2,{1},Manufacturing tool url not found".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value, + "Manufacturing tool url not found") error_handler.enqueue_error(error=error) return None @@ -107,21 +109,22 @@ g_utils.logger.debug("data: {0}".format(data)) resp = requests.post(url=url, data=data, - headers=headers) + headers=headers, + timeout=(30, 60)) return resp.json() except requests.exceptions.Timeout: - error = Error("{0},2,{1},Registration timeout".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value)) + error = Error.timeout(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value) error_handler.enqueue_error(error=error) return None except requests.exceptions.TooManyRedirects: - error = Error("{0},2,{1},Too many redirects".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value)) + error = Error.redirect(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value) error_handler.enqueue_error(error=error) return None except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value, - str(e))) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_VALIDATION_RESULT_ERROR.value, + str(e)) error_handler.enqueue_error(error=error) return None Index: cloudsync/handlers/ui_cs_request_handler.py =================================================================== diff -u -r3d21b2c336868d75ec738e8cfeddc24710d35106 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 3d21b2c336868d75ec738e8cfeddc24710d35106) +++ cloudsync/handlers/ui_cs_request_handler.py (.../ui_cs_request_handler.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -40,7 +40,6 @@ flag = self.event.wait() if flag: while len(self.queue) > 0: - # print('queue size: {0}'.format(len(self.queue))) message = self.queue.popleft() self.handle_message(message) self.event.clear() @@ -70,12 +69,11 @@ message_calculated_crc8 = helpers_crc8(message_data.encode('utf-8')) if message.CRC != str(message_calculated_crc8): - error = Error("{0},4,{1},Bad CRC on message {2}, Message CRC: {3}, Calculated CRC: {4}".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_BAD_CRC_ERROR.value, - message.sequence, - message.CRC, - str(message_calculated_crc8))) + error = Error.crc_mismatch(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_BAD_CRC_ERROR.value, + message.sequence, + message.CRC, + str(message_calculated_crc8)) self.error_handler.enqueue_error(error=error) else: # REGISTRATION MODE @@ -87,10 +85,9 @@ self.logger.debug('Applicable config: {0}'.format(message.g_config)) if (len(message.parameters) != 3) or (message.parameters[0] is None) or (message.parameters[2] is None): - error = Error( - "{0},2,{1},invalid # of parameters for registration".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_REQ_REGISTRATION_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, + "invalid # of parameters for registration") self.error_handler.enqueue_error(error=error) else: try: @@ -118,14 +115,14 @@ success_message='CS2MFT_REQ_REGISTRATION request added to network ' 'queue') except IOError: - error = Error( - "{0},2,{1},Error updating device config file".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SAVE_CONFIG_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SAVE_CONFIG_ERROR.value, + "Error updating device config file") self.error_handler.enqueue_error(error=error) except Exception as e: - error = Error(GENERAL_EXCEPTION_HOLDER.format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_REGISTRATION_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) # OPERATION MODE @@ -143,9 +140,9 @@ HDOpSubModes.mapped_str_value(message.parameters[1])) except Exception: device_state = DeviceStates.UNKNOWN_STATE - error = Error( - "{0},2,{1},Unknown device state received from UI".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_UNKNOWN_DEVICE_STATE_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_UNKNOWN_DEVICE_STATE_ERROR.value, + "Unknown device state received from UI") self.error_handler.enqueue_error(error=error) message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_STATE] = device_state.value @@ -162,9 +159,9 @@ success_message='CS2DCS_REQ_SET_DEVICE_STATE request added to network ' 'queue') except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_DEVICE_STATE_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) # SEND TREATMENT REPORT REQUEST @@ -196,9 +193,9 @@ 'queue') except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_SEND_TREATMENT_REPORT_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) # DECOMMISSIONING REQUEST @@ -212,9 +209,9 @@ OutboundMessageIDs.CS2UI_REQ_DEVICE_DECOMMISSIONED.value) + ',0' self.output_channel.enqueue_message(message_body) except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_REQ_DECOMMISSION_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_DECOMMISSION_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) # CHECK-IN REQUEST @@ -227,9 +224,9 @@ OutboundMessageIDs.CS2UI_REQ_CHECKIN.value) + ',0' self.output_channel.enqueue_message(message_body) except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_REQ_CHECKIN_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_REQ_CHECKIN_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) # CS LOG RETENTION @@ -243,14 +240,14 @@ OutboundMessageIDs.CS2UI_REQ_LOG_RETENTION.value) + f',{num_of_files}' + f',{del_size_mb}' self.output_channel.enqueue_message(message_body) except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_LOG_RETENTION_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_RETENTION_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) else: - error = Error("{0},2,{1},invalid # of parameters for log retention".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_LOG_RETENTION_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_LOG_RETENTION_ERROR.value, + "invalid # of parameters for log retention") self.error_handler.enqueue_error(error=error) # ERROR MESSAGE RECEIVED FROM UI @@ -269,16 +266,16 @@ self.logger.info("UI2CS_UPLOAD_DEVICE_LOG request received") if (len(message.parameters) != 2) or (message.parameters[0] is None) or (message.parameters[1] is None): - error = Error( - "{0},2,{1},invalid # of parameters for file upload request".format( - OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_LOG_ERROR.value)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + "invalid # of parameters for file upload request") self.error_handler.enqueue_error(error=error) else: try: local_checksum = helpers_sha256_checksum(message.parameters[0]) - assert (local_checksum == message.parameters[1]), 'No valid sha256 value.' + if local_checksum != message.parameters[1]: + raise ValueError('No valid sha256 value.') hd_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_HD_SERIAL] dg_serial_number = message.g_config[CONFIG_DEVICE][CONFIG_DEVICE_DG_SERIAL] @@ -303,7 +300,7 @@ success_message='CS2DCS_REQ_SEND_DEVICE_LOG request added to network queue') except Exception as e: - error = Error("{0},2,{1},{2}".format(OutboundMessageIDs.CS2UI_ERROR.value, - ErrorIDs.CS_DEVICE_LOG_ERROR.value, - e)) + error = Error.general(OutboundMessageIDs.CS2UI_ERROR.value, + ErrorIDs.CS_DEVICE_LOG_ERROR.value, + str(e)) self.error_handler.enqueue_error(error=error) Index: cloudsync/utils/globals.py =================================================================== diff -u -re3afee7b49f898e127dbe0330d0e9a63089f84d3 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/utils/globals.py (.../globals.py) (revision e3afee7b49f898e127dbe0330d0e9a63089f84d3) +++ cloudsync/utils/globals.py (.../globals.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -3,6 +3,7 @@ # PATHS PATH_HOME = os.getcwd() + '/' +PATH_CLOUDSYNC = PATH_HOME # Configuration groups CONFIG_DEVICE = 'device' @@ -50,24 +51,29 @@ # CONFIG CONFIG_PATH = os.path.join(PATH_HOME, "cloudsync/config/config.json") -DECOMMISSION_CS_PATH = "/var/configurations/CloudSync/" +# DECOMMISSION_CS_PATH = "/var/configurations/CloudSync/" +DECOMMISSION_CS_PATH = "data/cloudsync/" DECOMMISSION_FOLDERS = ['config', 'jwt', 'credentials'] -OPERATION_CONFIG_PATH = "/var/configurations/CloudSync/config/" +# OPERATION_CONFIG_PATH = "/var/configurations/CloudSync/config/" +OPERATION_CONFIG_PATH = "data/cloudsync/config/" OPERATION_CONFIG_FILE_PATH = os.path.join(OPERATION_CONFIG_PATH, "config.json") # LOGS -CS_LOG_PATH = "/media/sd-card/cloudsync/log" +# CS_LOG_PATH = "/media/sd-card/cloudsync/log" +CS_LOG_PATH = "data/log/" CS_LOG_FILE = os.path.join(CS_LOG_PATH, "cloudsync.log") # DEVICE TOKEN -TOKEN_CACHING_PATH = "/var/configurations/CloudSync/jwt/" +# TOKEN_CACHING_PATH = "/var/configurations/CloudSync/jwt/" +TOKEN_CACHING_PATH = "data/cloudsync/jwt/" DEVICE_KEBORMED_ACCESS_TOKEN_PATH = os.path.join(TOKEN_CACHING_PATH, "access_token.json") DEVICE_TOKEN_VALIDATION = "/api/device/validate" # CREDENTIALS -CREDENTIALS_PATH = "/var/configurations/CloudSync/credentials/" +# CREDENTIALS_PATH = "/var/configurations/CloudSync/credentials/" +CREDENTIALS_PATH = "data/cloudsync/credentials/" CERTIFICATE_X509_FILE_NAME = "client_certificate.pem" PRIVATE_KEY_FILE_NAME = "client_private_key.pem" @@ -78,10 +84,17 @@ CREDENTIALS_PUBLIC_KEY = os.path.join(CREDENTIALS_PATH, PUBLIC_KEY_FILE_NAME) # UI2CS VALUES -UI2CS_FILE_CHANNELS_PATH = "/media/sd-card/cloudsync" -UI2CS_FILE_LOG_PATH = "/media/sd-card/cloudsync/log" -# PATH for running off device -# UI2CS_FILE_CHANNELS_PATH = "data/busses" +# On-device paths (for reference): +# UI2CS_FILE_CHANNELS_PATH = "/media/sd-card/cloudsync" +# UI2CS_FILE_LOG_PATH = "/media/sd-card/cloudsync/log" +# Paths for running off device (local / Docker mock stack) +UI2CS_FILE_CHANNELS_PATH = os.path.join(PATH_HOME, "data/busses") +# Log retention paths. On-device, logs are on the SD card alongside bus files. +# SD_CARD_PATH env var lets the mock stack point these at a small tmpfs so +# shutil.disk_usage() returns a realistic SD-card-sized total, enabling +# log retention to trigger actual file deletion. +_sd_card = os.environ.get("SD_CARD_PATH", UI2CS_FILE_CHANNELS_PATH) +UI2CS_FILE_LOG_PATH = os.path.join(_sd_card, "log") # TREATMENT REPORT SECTIONS Index: cloudsync/utils/helpers.py =================================================================== diff -u -rcc36b7572e8d4e67ea514e038f9e07665d6e336f -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/utils/helpers.py (.../helpers.py) (revision cc36b7572e8d4e67ea514e038f9e07665d6e336f) +++ cloudsync/utils/helpers.py (.../helpers.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -9,6 +9,7 @@ import base64 import uuid import subprocess +import math from datetime import * from time import time, sleep @@ -81,7 +82,7 @@ """ try: float(val) - except ValueError: + except (ValueError, TypeError): return False return True @@ -98,7 +99,15 @@ if helpers_is_int(val): return int(val) elif helpers_is_float(val): - return float(val) + try: + f = float(val) + except Exception: + g_utils.logger.warning(f"Unexpected conversion failure for: '{val}' — replacing with 0") + return 0 + if math.isinf(f) or math.isnan(f): + g_utils.logger.warning(f"Non-finite float value encountered: '{val}' — replacing with 0") + return 0 + return f else: return val @@ -113,7 +122,7 @@ :return: the parsed treatment log file """ if not os.path.exists(path): - print("Path does not exist.") + g_utils.logger.warning("Path does not exist: %s", path) return {} result = {} @@ -122,7 +131,7 @@ group = "NoGroup" for line_ in lines: line = line_.replace("\n", "") - if "[" and "]" in line: + if "[" in line and "]" in line: group = line.split("[")[1].split("]")[0] if group not in result: result[group] = {} @@ -178,7 +187,6 @@ :return: The token if found, otherwise returns None """ data = None - # print('token path: {0}'.format(DEVICE_KEBORMED_ACCESS_TOKEN_PATH)) if not os.path.exists(DEVICE_KEBORMED_ACCESS_TOKEN_PATH): return None with open(DEVICE_KEBORMED_ACCESS_TOKEN_PATH, 'r') as f: @@ -271,7 +279,7 @@ return _wrapper -def helpers_file_to_byte_array(file_path: str) -> bytearray: +def helpers_file_to_byte_array(file_path: str) -> Union[str, None]: try: with open(file_path, "rb") as f: # Read the entire file in binary mode @@ -285,12 +293,12 @@ return None -def helpers_get_file_size(file_path: str) -> int: +def helpers_get_file_size(file_path: str) -> Union[int, None]: try: return os.path.getsize(file_path) except OSError as e: g_utils.logger.error(f'Error getting file size: {e}') - return False + return None def helpers_read_treatment_log_file(path: str): @@ -302,8 +310,6 @@ counter = 0 treatment_log_lines = f.readlines() - # print("log_lines: {0}".format(treatment_log_lines)) - while counter < len(treatment_log_lines): line = treatment_log_lines[counter].strip() @@ -325,14 +331,14 @@ data_components = data_line.split(',') data_record = { "time": int(data_components[0]) * S_MS_CONVERSION_FACTOR, - "bloodFlowRate": float(data_components[1]), - "dialysateFlowRate": float(data_components[2]), - "ultrafiltrationRate": float(data_components[3]), - "arterialPressure": float(data_components[4]), - "venousPressure": float(data_components[5]), - "systolic": float(data_components[6]), - "diastolic": float(data_components[7]), - "heartRate": float(data_components[8]) + "bloodFlowRate": helpers_try_numeric(data_components[1]), + "dialysateFlowRate": helpers_try_numeric(data_components[2]), + "ultrafiltrationRate": helpers_try_numeric(data_components[3]), + "arterialPressure": helpers_try_numeric(data_components[4]), + "venousPressure": helpers_try_numeric(data_components[5]), + "systolic": helpers_try_numeric(data_components[6]), + "diastolic": helpers_try_numeric(data_components[7]), + "heartRate": helpers_try_numeric(data_components[8]) } treatment_data['data']['treatment']['data'].append(data_record) elif section == TREATMENT_ALARMS: @@ -363,7 +369,6 @@ counter = section_start_counter else: - # print('regular line') if line.startswith(TREATMENT_CODE): treatment_data['data']['treatment']['treatmentCode'] = line.split(',')[1] treatment_data['reference'] = line.split(',')[1] @@ -483,20 +488,36 @@ method=method, g_config=g_config) request_added_to_queue = False - while not request_added_to_queue: + max_attempts = 300 + attempt = 0 + while not request_added_to_queue and attempt < max_attempts: request_added_to_queue = network_request_handler.enqueue_request(r) + if not request_added_to_queue: + sleep(0.1) + attempt += 1 - g_utils.logger.info(success_message) + if request_added_to_queue: + g_utils.logger.info(success_message) + else: + g_utils.logger.error("Failed to enqueue network request after {0} attempts: {1}".format(max_attempts, request_type.name)) else: g_utils.logger.warning("Internet DOWN: Network request {0} couldn't be processed".format(request_type.name)) def helpers_add_to_output_channel(output_channel, message_body, success_message): message_added_to_queue = False - while not message_added_to_queue: + max_attempts = 300 + attempt = 0 + while not message_added_to_queue and attempt < max_attempts: message_added_to_queue = output_channel.enqueue_message(message_body) + if not message_added_to_queue: + sleep(0.1) + attempt += 1 - g_utils.logger.info(success_message) + if message_added_to_queue: + g_utils.logger.info(success_message) + else: + g_utils.logger.error("Failed to enqueue output message after {0} attempts".format(max_attempts)) def helpers_sha256_checksum(data: str) -> str: @@ -630,9 +651,20 @@ local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) # Start and End timestamps calculation - today = datetime.now(timezone.utc) - start_of_day = int(today.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()*1000) - end_of_day = int(today.replace(hour=23, minute=59, second=59, microsecond=999999).timestamp()*1000) + # Parse the date from the rotated log filename (suffix format: MM-DD-YYYY) + # to ensure timestamps match the actual log date, not the upload date. + # This prevents mismatched timestamps when uploading backlogged files. + log_date = None + parts = file_name.rsplit('.', 1) + if len(parts) == 2: + try: + log_date = datetime.strptime(parts[1], "%m-%d-%Y").replace(tzinfo=timezone.utc) + except ValueError: + pass + if log_date is None: + log_date = datetime.now(timezone.utc) + start_of_day = int(log_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()*1000) + end_of_day = int(log_date.replace(hour=23, minute=59, second=59, microsecond=999999).timestamp()*1000) # Populate JSON object cs_log_json['start_session']['reference'] = str(uuid.uuid4()) @@ -675,7 +707,7 @@ local_timestamp = int(datetime.now(timezone.utc).timestamp()*1000) # Calculate the UTC timestamp based on the local date and time the UI provided (in milliseconds) (NEED BETTER SOLUTION HERE) - if extracted_metadata['local_date'] != 'unknown' or extracted_metadata['local_time'] != 'unknown': + if extracted_metadata['local_date'] != 'unknown' and extracted_metadata['local_time'] != 'unknown': datetime_obj = datetime.strptime(f"{extracted_metadata['local_date']}{extracted_metadata['local_time']}", "%Y%m%d%H%M%S") # Convert to UTC timestamp ui_utc_timestamp = int(datetime_obj.timestamp()*1000) @@ -749,13 +781,14 @@ cs_log_pct = retention_pct / 100 num_files_deleted = 0 total_deleted_size = 0 - # Total sd stats - sd_total_bytes, sd_used_bytes, sd_free_bytes = shutil.disk_usage(UI2CS_FILE_CHANNELS_PATH) + # Total sd stats — use the log path's filesystem (same as channels on device, + # but may be a separate tmpfs in the mock stack for realistic simulation) + sd_total_bytes, sd_used_bytes, sd_free_bytes = shutil.disk_usage(UI2CS_FILE_LOG_PATH) if sd_total_bytes == 0: sd_total_bytes = 1 # Cloudsync log stats - used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_CHANNELS_PATH]).split() + used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_LOG_PATH]).split() if len(used_bytes_proc_call) > 0: cs_used_bytes = int(used_bytes_proc_call[0].decode('utf-8')) @@ -775,7 +808,7 @@ os.remove(file_to_remove) num_files_deleted += 1 f.pop(0) - used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_CHANNELS_PATH]).split() + used_bytes_proc_call = subprocess.check_output(['du', '-bsx', UI2CS_FILE_LOG_PATH]).split() if len(used_bytes_proc_call) > 0: cs_used_bytes = int(used_bytes_proc_call[0].decode('utf-8')) else: Index: cloudsync/utils/reachability.py =================================================================== diff -u -r373fd8d0c900ff129152e13d896e7de2e222b0e7 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cloudsync/utils/reachability.py (.../reachability.py) (revision 373fd8d0c900ff129152e13d896e7de2e222b0e7) +++ cloudsync/utils/reachability.py (.../reachability.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -33,7 +33,7 @@ } try: - response = requests.get(url=self.url_reachability, headers=headers) + response = requests.get(url=self.url_reachability, headers=headers, timeout=10) status_code = response.status_code except requests.exceptions.RequestException: status_code = INTERNAL_SERVER_ERROR Index: cs.py =================================================================== diff -u -r029aec0d4746d8a2897e46516cf9d39629d684b5 -r99d35f3d5898d50372e1745b96b393db40cd8394 --- cs.py (.../cs.py) (revision 029aec0d4746d8a2897e46516cf9d39629d684b5) +++ cs.py (.../cs.py) (revision 99d35f3d5898d50372e1745b96b393db40cd8394) @@ -10,8 +10,13 @@ from subprocess import Popen from cloudsync.utils.globals import EXEC_MODE_UPGRADE, EXEC_MODE_UPDATE, EXEC_MODE_NORMAL, EXEC_MODE +logging.basicConfig(format='%(message)s', level=logging.INFO) +logger = logging.getLogger(__name__) + DELAY = 0.5 -USAGE_FORMAT = "Usage: ./cs.py [debug|info|warning|error] [upgrade|update|]" +SENTINEL_FILE = "/tmp/cloudsync_restart_sentinel" +SENTINEL_CHECK_INTERVAL = 5 +USAGE_FORMAT = "Usage: ./cs.py [debug|info|warning|error] [upgrade|update|]" arguments = sys.argv logging_level = logging.INFO @@ -28,34 +33,51 @@ def start(): cs_proc = get_pid() if cs_proc: - print("CloudSync app already running") + logger.info("CloudSync app already running") else: - print("Starting CloudSync app with logging level {0} in {1} mode".format(logging_level,EXEC_MODE)) + logger.info("Starting CloudSync app with logging level %s in %s mode", logging_level, EXEC_MODE) time.sleep(DELAY) Popen(['python3', 'cloud_sync.py', str(logging_level),EXEC_MODE]) def stop(): cs_proc_pid = get_pid() if cs_proc_pid: - print("Stopping CloudSync app...") + logger.info("Stopping CloudSync app...") time.sleep(DELAY) - os.kill(int(cs_proc_pid), signal.SIGKILL) + os.kill(int(cs_proc_pid), signal.SIGTERM) else: - print("CloudSync app is not running.") + logger.info("CloudSync app is not running.") +def monitor(): + """Monitor sentinel file and restart CloudSync when detected.""" + logger.info("Monitoring sentinel file: %s (interval %ds)", SENTINEL_FILE, SENTINEL_CHECK_INTERVAL) + start() + while True: + if os.path.isfile(SENTINEL_FILE): + logger.info("Sentinel file detected — restarting CloudSync") + try: + os.remove(SENTINEL_FILE) + except OSError: + pass + stop() + time.sleep(1) + start() + time.sleep(SENTINEL_CHECK_INTERVAL) + + if len(arguments) <= 1: - print(USAGE_FORMAT) + logger.error(USAGE_FORMAT) sys.exit(101) if len(arguments) >= 4: argument=arguments[3] if argument in ( EXEC_MODE_UPGRADE, EXEC_MODE_UPDATE, EXEC_MODE_NORMAL ): EXEC_MODE=argument else: - print("incorrect {0} argument".format(argument)) - print(USAGE_FORMAT) + logger.error("incorrect %s argument", argument) + logger.error(USAGE_FORMAT) sys.exit(104) if len(arguments) >= 3: @@ -69,8 +91,8 @@ elif argument== "error": logging_level = logging.ERROR else: - print("incorrect {0} argument".format(argument)) - print(USAGE_FORMAT) + logger.error("incorrect %s argument", argument) + logger.error(USAGE_FORMAT) sys.exit(103) if len(arguments) >= 2: @@ -80,17 +102,19 @@ elif argument == "stop": stop() elif argument == "restart": - print("Restarting CloudSync app...") + logger.info("Restarting CloudSync app...") time.sleep(DELAY) stop() start() elif argument == "status": cs_proc = get_pid() if cs_proc: - print("CloudSync app IS running") + logger.info("CloudSync app IS running") else: - print("CloudSync app IS NOT running") + logger.info("CloudSync app IS NOT running") + elif argument == "monitor": + monitor() else: - print("incorrect {0} argument".format(argument)) - print(USAGE_FORMAT) + logger.error("incorrect %s argument", argument) + logger.error(USAGE_FORMAT) sys.exit(102) \ No newline at end of file