Index: cloudsync/handlers/outgoing/handler_cs_to_dcs.py =================================================================== diff -u -rfc55dd5d0ee3e2382081ff67612185e21bc5dfc4 -r60739e0cda40872319044ed2d7def16fb17d37e9 --- cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision fc55dd5d0ee3e2382081ff67612185e21bc5dfc4) +++ cloudsync/handlers/outgoing/handler_cs_to_dcs.py (.../handler_cs_to_dcs.py) (revision 60739e0cda40872319044ed2d7def16fb17d37e9) @@ -104,12 +104,8 @@ resp = requests.post(url=url, data=json.dumps(data), -<<<<<<< HEAD - headers=headers) -======= headers=headers, timeout=(30, 60)) ->>>>>>> bugfix/DENBUG-331-duplicate-device-logs-uploade g_utils.logger.info(f"Token verification response: {resp.status_code}") return resp except requests.exceptions.Timeout: @@ -601,11 +597,7 @@ else: g_utils.logger.warning("Token refresh returned None, using existing token for end-session") -<<<<<<< HEAD - end_session_url = os.path.join(base_url, "api/device/data/end-session") -======= end_session_url = base_url.rstrip("/") + "/api/device/data/end-session" ->>>>>>> bugfix/DENBUG-331-duplicate-device-logs-uploade end_session_payload = file_json['end_session'] end_session_payload['sessionId'] = session_id end_session_payload['completedAt'] = int(datetime.now(timezone.utc).timestamp()*1000) Index: cloudsync/utils/filesystem.py =================================================================== diff -u --- cloudsync/utils/filesystem.py (revision 0) +++ cloudsync/utils/filesystem.py (revision 60739e0cda40872319044ed2d7def16fb17d37e9) @@ -0,0 +1,45 @@ +"""Filesystem health check utilities for pre-flight I/O validation.""" + +import os +import shutil + + +def check_readable(path): + """Check if a file or directory is readable. + + :param str path: Path to check + :return: True if readable, False otherwise + :rtype: bool + """ + return os.path.exists(path) and os.access(path, os.R_OK) + + +def check_writable(path): + """Check if a path is writable. For files, checks the file. For + non-existent paths, checks the parent directory. + + :param str path: Path to check + :return: True if writable, False otherwise + :rtype: bool + """ + if os.path.exists(path): + return os.access(path, os.W_OK) + parent = os.path.dirname(path) or "." + return os.path.exists(parent) and os.access(parent, os.W_OK) + + +def check_disk_space_mb(path, required_mb=10): + """Check if sufficient disk space is available at the given path. + + :param str path: Path to check (file or directory) + :param int required_mb: Minimum required free space in MB + :return: True if sufficient space available, False otherwise + :rtype: bool + """ + try: + check_path = path if os.path.isdir(path) else os.path.dirname(path) or "." + usage = shutil.disk_usage(check_path) + free_mb = usage.free / (1024 * 1024) + return free_mb >= required_mb + except OSError: + return False Index: cloudsync/utils/watchdog.py =================================================================== diff -u --- cloudsync/utils/watchdog.py (revision 0) +++ cloudsync/utils/watchdog.py (revision 60739e0cda40872319044ed2d7def16fb17d37e9) @@ -0,0 +1,157 @@ +"""Internal watchdog thread for monitoring critical daemon threads. + +Monitors registered threads via periodic is_alive() checks, attempts restart +on failure (up to a configurable limit), and creates a sentinel file to +request full process restart when recovery is exhausted. +""" + +import os +from logging import Logger +from threading import Event, Thread +from time import sleep + + +# Default sentinel file path — cs.py monitors this to trigger process restart +SENTINEL_PATH = "/tmp/cloudsync_restart_sentinel" + + +class Watchdog: + """Monitors registered daemon threads and attempts recovery on failure. + + :param Logger logger: Logger instance for health/status messages. + :param int check_interval: Seconds between is_alive() sweeps (default 30). + :param int max_restarts: Max restart attempts per thread before sentinel (default 3). + :param str sentinel_path: Path to sentinel file for full process restart. + """ + + def __init__(self, logger, check_interval=30, max_restarts=3, + sentinel_path=SENTINEL_PATH): + self._logger = logger + self._check_interval = check_interval + self._max_restarts = max_restarts + self._sentinel_path = sentinel_path + self._entries = {} + self._stop_event = Event() + self._thread = Thread(target=self._monitor, daemon=True) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def register(self, name, get_thread, restart_fn): + """Register a thread for monitoring. + + :param str name: Human-readable name (e.g. "heartbeat"). + :param callable get_thread: Zero-arg callable returning the current + Thread object (e.g. ``lambda: obj.thread``). + :param callable restart_fn: Zero-arg callable that creates and starts + a replacement thread. Use :func:`make_restart_fn` for the common + case of daemon threads backed by a bound method. + """ + self._entries[name] = { + "get_thread": get_thread, + "restart_fn": restart_fn, + "failures": 0, + } + + def start(self): + """Start the watchdog monitoring loop.""" + self._thread.start() + + def stop(self): + """Signal the watchdog to stop.""" + self._stop_event.set() + + def is_running(self): + """Return True if the watchdog thread is alive.""" + return self._thread.is_alive() + + def get_health(self): + """Return a dict mapping thread name → alive (bool).""" + result = {} + for name, entry in self._entries.items(): + thread = entry["get_thread"]() + result[name] = thread is not None and thread.is_alive() + return result + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _monitor(self): + """Main monitoring loop — runs until stop_event is set.""" + while not self._stop_event.is_set(): + self._check_threads() + self._stop_event.wait(self._check_interval) + + def _check_threads(self): + """Single sweep: check every registered thread.""" + all_healthy = True + for name, entry in self._entries.items(): + thread = entry["get_thread"]() + if thread is not None and thread.is_alive(): + continue + + all_healthy = False + entry["failures"] += 1 + failures = entry["failures"] + + self._logger.error( + "Watchdog: thread '%s' is dead (failure %d/%d)", + name, failures, self._max_restarts, + ) + + if failures <= self._max_restarts: + self._try_restart(name, entry) + else: + self._logger.error( + "Watchdog: thread '%s' exceeded max restarts (%d), " + "creating sentinel for full process restart", + name, self._max_restarts, + ) + self._create_sentinel(name) + + if all_healthy: + self._logger.info("Watchdog: all %d threads healthy", + len(self._entries)) + + def _try_restart(self, name, entry): + """Attempt to restart a dead thread.""" + try: + entry["restart_fn"]() + self._logger.info("Watchdog: thread '%s' restarted", name) + except Exception as exc: + self._logger.error( + "Watchdog: failed to restart thread '%s': %s", name, exc, + ) + + def _create_sentinel(self, failed_thread_name): + """Write a sentinel file so cs.py triggers a full process restart.""" + try: + with open(self._sentinel_path, "w") as fh: + fh.write(failed_thread_name) + self._logger.info( + "Watchdog: sentinel file created at %s", self._sentinel_path, + ) + except OSError as exc: + self._logger.error( + "Watchdog: failed to create sentinel file: %s", exc, + ) + + +def make_restart_fn(obj, thread_attr, target_fn): + """Build a restart callable for the common daemon-thread pattern. + + Creates a new ``Thread(target=target_fn, daemon=True)``, assigns it to + ``obj.``, and starts it. + + :param obj: Object that owns the thread (e.g. a HeartBeatProvider instance). + :param str thread_attr: Name of the Thread attribute on *obj* (e.g. "thread"). + :param callable target_fn: Bound method to use as the thread target. + :return: Zero-arg callable suitable for :meth:`Watchdog.register`. + """ + def _restart(): + new_thread = Thread(target=target_fn, daemon=True) + setattr(obj, thread_attr, new_thread) + new_thread.start() + return _restart