diff --git a/backup_core.py b/backup_core.py index 4e95482..758d01f 100644 --- a/backup_core.py +++ b/backup_core.py @@ -9,8 +9,28 @@ import sys import time import urllib.parse from datetime import datetime -from contextlib import redirect_stdout, redirect_stderr from pathlib import Path +import threading +import builtins + +# Thread-local storage for log file path to ensure print() statements are thread-safe and write to the correct log file +thread_local_log = threading.local() + +def print(*args, **kwargs): + # Check if this thread has a thread-local log path + log_path = getattr(thread_local_log, 'path', None) + if log_path: + try: + sep = kwargs.get('sep', ' ') + end = kwargs.get('end', '\n') + msg = sep.join(str(arg) for arg in args) + end + with open(log_path, 'a', encoding='utf-8') as f: + f.write(msg) + except Exception as e: + builtins.print(f"Fallback log error: {e}", file=sys.stderr) + builtins.print(*args, **kwargs) + else: + builtins.print(*args, **kwargs) import requests from pyVim.connect import SmartConnect, Disconnect @@ -580,22 +600,17 @@ def run_backup(host, user, password, vm_name, dest, compress=False, no_verify_ss Falls back to full download if CBT state is unavailable. """ if log_path: - logfile = open(log_path, 'a', encoding='utf-8', buffering=1) - def _wrap(): - with redirect_stdout(logfile), redirect_stderr(logfile): - return _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl, - sftp_host, sftp_user, sftp_password, sftp_key, - progress_cb=progress_cb, disk_filter=disk_filter, job_id=job_id, - is_cancelled_cb=is_cancelled_cb, use_cbt=use_cbt) - try: - return _wrap() - finally: - logfile.close() + thread_local_log.path = log_path else: + thread_local_log.path = None + + try: return _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl, sftp_host, sftp_user, sftp_password, sftp_key, progress_cb=progress_cb, disk_filter=disk_filter, job_id=job_id, is_cancelled_cb=is_cancelled_cb, use_cbt=use_cbt) + finally: + thread_local_log.path = None def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl, diff --git a/gui_app.py b/gui_app.py index 9af6748..0fbf885 100644 --- a/gui_app.py +++ b/gui_app.py @@ -47,6 +47,25 @@ JOBS_DB_PATH = BASE_DIR / 'jobs.json' DB_PATH = BASE_DIR / 'jobs.db' jobs_db_lock = threading.RLock() +# Active job threads tracker: {job_id: thread_object} +active_job_threads = {} +active_job_threads_lock = threading.Lock() + +def save_job_to_db_direct(jid, info): + try: + conn = sqlite3.connect(DB_PATH) + try: + cursor = conn.cursor() + cursor.execute( + "INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)", + (jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False)) + ) + conn.commit() + finally: + conn.close() + except Exception as e: + print(f"ERROR: Failed to save job {jid} directly to SQLite: {e}", file=sys.stderr) + # In-memory job store: {job_id: job_dict} jobs: dict = {} @@ -117,7 +136,7 @@ def migrate_old_json_db(): except Exception as e: print(f"WARNING: Migration of jobs.json failed: {e}", file=sys.stderr) -def load_jobs_db(): +def load_jobs_db(is_startup=False): global jobs init_db() migrate_old_json_db() @@ -128,43 +147,64 @@ def load_jobs_db(): rows = cursor.fetchall() with jobs_db_lock: - jobs.clear() + db_jids = set() for jid, data_str in rows: + db_jids.add(jid) try: - jobs[jid] = json.loads(data_str) + db_info = json.loads(data_str) + if jid in jobs: + # Only update if the job is not running in this process's background threads + is_running_here = False + with active_job_threads_lock: + t = active_job_threads.get(jid) + if t and t.is_alive(): + is_running_here = True + + if not is_running_here: + jobs[jid].clear() + jobs[jid].update(db_info) + else: + jobs[jid] = db_info except Exception as e: print(f"ERROR: Failed to parse job data for {jid}: {e}", file=sys.stderr) + + # Remove jobs that are no longer in the DB + for jid in list(jobs.keys()): + if jid not in db_jids: + jobs.pop(jid, None) - # Clean up any jobs left in running/queued state across restart - updated_jobs = [] - with jobs_db_lock: - for jid, info in jobs.items(): - if info.get('status') in ('running', 'queued'): - info['status'] = 'failed (Interrupted by restart)' - info['progress'] = { - 'pct': 100, - 'phase': 'failed', - 'detail': 'Job was interrupted by server restart.' - } - updated_jobs.append((jid, info)) - - if updated_jobs: - try: - cursor = conn.cursor() - for jid, info in updated_jobs: - cursor.execute( - "INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)", - (jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False)) - ) - conn.commit() - except Exception as e: - print(f"ERROR: Failed to update interrupted jobs in SQLite: {e}", file=sys.stderr) + # Clean up any jobs left in running/queued state across restart (only at startup) + if is_startup: + updated_jobs = [] + with jobs_db_lock: + for jid, info in jobs.items(): + if info.get('status') in ('running', 'queued'): + info['status'] = 'failed (Interrupted by restart)' + info['progress'] = { + 'pct': 100, + 'phase': 'failed', + 'detail': 'Job was interrupted by server restart.' + } + updated_jobs.append((jid, info)) + + if updated_jobs: + try: + cursor = conn.cursor() + for jid, info in updated_jobs: + cursor.execute( + "INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)", + (jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False)) + ) + conn.commit() + except Exception as e: + print(f"ERROR: Failed to update interrupted jobs in SQLite: {e}", file=sys.stderr) conn.close() except Exception as e: print(f"ERROR: Failed to load SQLite database: {e}", file=sys.stderr) def save_jobs_db(): with jobs_db_lock: + load_jobs_db() # Reload other workers' jobs first to avoid overwriting them try: conn = sqlite3.connect(DB_PATH) try: @@ -326,7 +366,7 @@ def reschedule_active_jobs(): print(f"Loaded {len(jobs)} jobs and re-scheduled {rescheduled_count} jobs.") # Load database and reschedule active tasks on startup -load_jobs_db() +load_jobs_db(is_startup=True) reschedule_active_jobs() # ── VM list cache ───────────────────────────────────────────────────────────── @@ -1295,6 +1335,21 @@ def replicate_backup_folder(src_dir, dest_dir, log_path=None): def run_job_thread(jid): + with active_job_threads_lock: + t = active_job_threads.get(jid) + if t and t.is_alive(): + print(f"WARNING: Job '{jid}' is already running in thread '{t.name}'. Aborting duplicate execution.", file=sys.stderr) + return + active_job_threads[jid] = threading.current_thread() + try: + run_job_thread_impl(jid) + finally: + with active_job_threads_lock: + if active_job_threads.get(jid) == threading.current_thread(): + active_job_threads.pop(jid, None) + + +def run_job_thread_impl(jid): """Worker executed in a thread (and by APScheduler).""" with jobs_db_lock: info = jobs.get(jid) @@ -1343,6 +1398,7 @@ def run_job_thread(jid): 'phase': f'vm {vm_idx+1}/{total} ({vm_n})', 'detail': f"[{vm_n}] {prog.get('phase', '')}: {prog.get('detail', '')}" } + save_job_to_db_direct(jid, info) return _cb try: @@ -1451,6 +1507,7 @@ def run_job_thread(jid): def progress_cb(prog): with jobs_db_lock: info['progress'] = prog + save_job_to_db_direct(jid, info) try: run_backup( @@ -2130,6 +2187,7 @@ def reports_data_api(): @app.route('/jobs') @login_required def list_jobs(): + load_jobs_db() with jobs_db_lock: sorted_items = sorted(jobs.items(), key=lambda x: x[1].get('started', 0), reverse=True) job_list = [ @@ -2145,6 +2203,7 @@ def list_jobs(): @app.route('/job/') @login_required def job_detail(jobid): + load_jobs_db() with jobs_db_lock: info = jobs.get(jobid) if not info: @@ -2156,6 +2215,7 @@ def job_detail(jobid): @app.route('/job//log') @login_required def job_log(jobid): + load_jobs_db() with jobs_db_lock: info = jobs.get(jobid) if not info: @@ -2171,6 +2231,7 @@ def job_log(jobid): @app.route('/api/job//status') @login_required def api_job_status(jobid): + load_jobs_db() with jobs_db_lock: info = jobs.get(jobid) if not info: