refactor: implement thread-local logging and improve job state synchronization across threads and database

This commit is contained in:
Rizqi 2026-06-27 02:10:06 +07:00
parent 473fc35069
commit bbedde19df
2 changed files with 116 additions and 40 deletions

View File

@ -9,8 +9,28 @@ import sys
import time import time
import urllib.parse import urllib.parse
from datetime import datetime from datetime import datetime
from contextlib import redirect_stdout, redirect_stderr
from pathlib import Path from pathlib import Path
import threading
import builtins
# Thread-local storage for log file path to ensure print() statements are thread-safe and write to the correct log file
thread_local_log = threading.local()
def print(*args, **kwargs):
# Check if this thread has a thread-local log path
log_path = getattr(thread_local_log, 'path', None)
if log_path:
try:
sep = kwargs.get('sep', ' ')
end = kwargs.get('end', '\n')
msg = sep.join(str(arg) for arg in args) + end
with open(log_path, 'a', encoding='utf-8') as f:
f.write(msg)
except Exception as e:
builtins.print(f"Fallback log error: {e}", file=sys.stderr)
builtins.print(*args, **kwargs)
else:
builtins.print(*args, **kwargs)
import requests import requests
from pyVim.connect import SmartConnect, Disconnect from pyVim.connect import SmartConnect, Disconnect
@ -580,22 +600,17 @@ def run_backup(host, user, password, vm_name, dest, compress=False, no_verify_ss
Falls back to full download if CBT state is unavailable. Falls back to full download if CBT state is unavailable.
""" """
if log_path: if log_path:
logfile = open(log_path, 'a', encoding='utf-8', buffering=1) thread_local_log.path = log_path
def _wrap():
with redirect_stdout(logfile), redirect_stderr(logfile):
return _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl,
sftp_host, sftp_user, sftp_password, sftp_key,
progress_cb=progress_cb, disk_filter=disk_filter, job_id=job_id,
is_cancelled_cb=is_cancelled_cb, use_cbt=use_cbt)
try:
return _wrap()
finally:
logfile.close()
else: else:
thread_local_log.path = None
try:
return _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl, return _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl,
sftp_host, sftp_user, sftp_password, sftp_key, sftp_host, sftp_user, sftp_password, sftp_key,
progress_cb=progress_cb, disk_filter=disk_filter, job_id=job_id, progress_cb=progress_cb, disk_filter=disk_filter, job_id=job_id,
is_cancelled_cb=is_cancelled_cb, use_cbt=use_cbt) is_cancelled_cb=is_cancelled_cb, use_cbt=use_cbt)
finally:
thread_local_log.path = None
def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl, def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl,

View File

@ -47,6 +47,25 @@ JOBS_DB_PATH = BASE_DIR / 'jobs.json'
DB_PATH = BASE_DIR / 'jobs.db' DB_PATH = BASE_DIR / 'jobs.db'
jobs_db_lock = threading.RLock() jobs_db_lock = threading.RLock()
# Active job threads tracker: {job_id: thread_object}
active_job_threads = {}
active_job_threads_lock = threading.Lock()
def save_job_to_db_direct(jid, info):
try:
conn = sqlite3.connect(DB_PATH)
try:
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)",
(jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False))
)
conn.commit()
finally:
conn.close()
except Exception as e:
print(f"ERROR: Failed to save job {jid} directly to SQLite: {e}", file=sys.stderr)
# In-memory job store: {job_id: job_dict} # In-memory job store: {job_id: job_dict}
jobs: dict = {} jobs: dict = {}
@ -117,7 +136,7 @@ def migrate_old_json_db():
except Exception as e: except Exception as e:
print(f"WARNING: Migration of jobs.json failed: {e}", file=sys.stderr) print(f"WARNING: Migration of jobs.json failed: {e}", file=sys.stderr)
def load_jobs_db(): def load_jobs_db(is_startup=False):
global jobs global jobs
init_db() init_db()
migrate_old_json_db() migrate_old_json_db()
@ -128,43 +147,64 @@ def load_jobs_db():
rows = cursor.fetchall() rows = cursor.fetchall()
with jobs_db_lock: with jobs_db_lock:
jobs.clear() db_jids = set()
for jid, data_str in rows: for jid, data_str in rows:
db_jids.add(jid)
try: try:
jobs[jid] = json.loads(data_str) db_info = json.loads(data_str)
if jid in jobs:
# Only update if the job is not running in this process's background threads
is_running_here = False
with active_job_threads_lock:
t = active_job_threads.get(jid)
if t and t.is_alive():
is_running_here = True
if not is_running_here:
jobs[jid].clear()
jobs[jid].update(db_info)
else:
jobs[jid] = db_info
except Exception as e: except Exception as e:
print(f"ERROR: Failed to parse job data for {jid}: {e}", file=sys.stderr) print(f"ERROR: Failed to parse job data for {jid}: {e}", file=sys.stderr)
# Remove jobs that are no longer in the DB
for jid in list(jobs.keys()):
if jid not in db_jids:
jobs.pop(jid, None)
# Clean up any jobs left in running/queued state across restart # Clean up any jobs left in running/queued state across restart (only at startup)
updated_jobs = [] if is_startup:
with jobs_db_lock: updated_jobs = []
for jid, info in jobs.items(): with jobs_db_lock:
if info.get('status') in ('running', 'queued'): for jid, info in jobs.items():
info['status'] = 'failed (Interrupted by restart)' if info.get('status') in ('running', 'queued'):
info['progress'] = { info['status'] = 'failed (Interrupted by restart)'
'pct': 100, info['progress'] = {
'phase': 'failed', 'pct': 100,
'detail': 'Job was interrupted by server restart.' 'phase': 'failed',
} 'detail': 'Job was interrupted by server restart.'
updated_jobs.append((jid, info)) }
updated_jobs.append((jid, info))
if updated_jobs:
try: if updated_jobs:
cursor = conn.cursor() try:
for jid, info in updated_jobs: cursor = conn.cursor()
cursor.execute( for jid, info in updated_jobs:
"INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)", cursor.execute(
(jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False)) "INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)",
) (jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False))
conn.commit() )
except Exception as e: conn.commit()
print(f"ERROR: Failed to update interrupted jobs in SQLite: {e}", file=sys.stderr) except Exception as e:
print(f"ERROR: Failed to update interrupted jobs in SQLite: {e}", file=sys.stderr)
conn.close() conn.close()
except Exception as e: except Exception as e:
print(f"ERROR: Failed to load SQLite database: {e}", file=sys.stderr) print(f"ERROR: Failed to load SQLite database: {e}", file=sys.stderr)
def save_jobs_db(): def save_jobs_db():
with jobs_db_lock: with jobs_db_lock:
load_jobs_db() # Reload other workers' jobs first to avoid overwriting them
try: try:
conn = sqlite3.connect(DB_PATH) conn = sqlite3.connect(DB_PATH)
try: try:
@ -326,7 +366,7 @@ def reschedule_active_jobs():
print(f"Loaded {len(jobs)} jobs and re-scheduled {rescheduled_count} jobs.") print(f"Loaded {len(jobs)} jobs and re-scheduled {rescheduled_count} jobs.")
# Load database and reschedule active tasks on startup # Load database and reschedule active tasks on startup
load_jobs_db() load_jobs_db(is_startup=True)
reschedule_active_jobs() reschedule_active_jobs()
# ── VM list cache ───────────────────────────────────────────────────────────── # ── VM list cache ─────────────────────────────────────────────────────────────
@ -1295,6 +1335,21 @@ def replicate_backup_folder(src_dir, dest_dir, log_path=None):
def run_job_thread(jid): def run_job_thread(jid):
with active_job_threads_lock:
t = active_job_threads.get(jid)
if t and t.is_alive():
print(f"WARNING: Job '{jid}' is already running in thread '{t.name}'. Aborting duplicate execution.", file=sys.stderr)
return
active_job_threads[jid] = threading.current_thread()
try:
run_job_thread_impl(jid)
finally:
with active_job_threads_lock:
if active_job_threads.get(jid) == threading.current_thread():
active_job_threads.pop(jid, None)
def run_job_thread_impl(jid):
"""Worker executed in a thread (and by APScheduler).""" """Worker executed in a thread (and by APScheduler)."""
with jobs_db_lock: with jobs_db_lock:
info = jobs.get(jid) info = jobs.get(jid)
@ -1343,6 +1398,7 @@ def run_job_thread(jid):
'phase': f'vm {vm_idx+1}/{total} ({vm_n})', 'phase': f'vm {vm_idx+1}/{total} ({vm_n})',
'detail': f"[{vm_n}] {prog.get('phase', '')}: {prog.get('detail', '')}" 'detail': f"[{vm_n}] {prog.get('phase', '')}: {prog.get('detail', '')}"
} }
save_job_to_db_direct(jid, info)
return _cb return _cb
try: try:
@ -1451,6 +1507,7 @@ def run_job_thread(jid):
def progress_cb(prog): def progress_cb(prog):
with jobs_db_lock: with jobs_db_lock:
info['progress'] = prog info['progress'] = prog
save_job_to_db_direct(jid, info)
try: try:
run_backup( run_backup(
@ -2130,6 +2187,7 @@ def reports_data_api():
@app.route('/jobs') @app.route('/jobs')
@login_required @login_required
def list_jobs(): def list_jobs():
load_jobs_db()
with jobs_db_lock: with jobs_db_lock:
sorted_items = sorted(jobs.items(), key=lambda x: x[1].get('started', 0), reverse=True) sorted_items = sorted(jobs.items(), key=lambda x: x[1].get('started', 0), reverse=True)
job_list = [ job_list = [
@ -2145,6 +2203,7 @@ def list_jobs():
@app.route('/job/<jobid>') @app.route('/job/<jobid>')
@login_required @login_required
def job_detail(jobid): def job_detail(jobid):
load_jobs_db()
with jobs_db_lock: with jobs_db_lock:
info = jobs.get(jobid) info = jobs.get(jobid)
if not info: if not info:
@ -2156,6 +2215,7 @@ def job_detail(jobid):
@app.route('/job/<jobid>/log') @app.route('/job/<jobid>/log')
@login_required @login_required
def job_log(jobid): def job_log(jobid):
load_jobs_db()
with jobs_db_lock: with jobs_db_lock:
info = jobs.get(jobid) info = jobs.get(jobid)
if not info: if not info:
@ -2171,6 +2231,7 @@ def job_log(jobid):
@app.route('/api/job/<jobid>/status') @app.route('/api/job/<jobid>/status')
@login_required @login_required
def api_job_status(jobid): def api_job_status(jobid):
load_jobs_db()
with jobs_db_lock: with jobs_db_lock:
info = jobs.get(jobid) info = jobs.get(jobid)
if not info: if not info: