refactor: migrate jobs storage from JSON file to SQLite and add thread-safe access to job and cache states

This commit is contained in:
Rizqi 2026-06-23 00:31:59 +07:00
parent 046689bd09
commit b99755771e
3 changed files with 307 additions and 163 deletions

View File

@ -115,14 +115,19 @@ def list_vms(host, user, password, no_verify_ssl=False):
def wait_for_task(task, action_name='job'): def wait_for_task(task, action_name='job'):
while task.info.state not in (vim.TaskInfo.State.success, vim.TaskInfo.State.error): while True:
info = getattr(task, 'info', None)
if info and info.state in (vim.TaskInfo.State.success, vim.TaskInfo.State.error):
break
time.sleep(1) time.sleep(1)
if task.info.state == vim.TaskInfo.State.success: info = task.info
return task.info.result if info.state == vim.TaskInfo.State.success:
return info.result
else: else:
err = task.info.error err = info.error
fault_name = err.__class__.__name__ if err else "UnknownFault"
err_msg = getattr(err, 'msg', None) or str(err) err_msg = getattr(err, 'msg', None) or str(err)
raise Exception(f"{action_name} did not complete successfully: {err_msg}") raise Exception(f"{action_name} did not complete successfully: {fault_name}: {err_msg}")
def create_snapshot(vm, snap_name, desc="backup snapshot", memory=False, quiesce=False): def create_snapshot(vm, snap_name, desc="backup snapshot", memory=False, quiesce=False):
@ -152,7 +157,7 @@ def download_datastore_file(host, dc_name, datastore_name, ds_path, local_path,
print(f"Downloading {ds_path} from datastore {datastore_name} to {local_path}") print(f"Downloading {ds_path} from datastore {datastore_name} to {local_path}")
print(f" URL: {url}") print(f" URL: {url}")
sha256 = hashlib.sha256() sha256 = hashlib.sha256()
with requests.get(url, headers=headers, stream=True, verify=verify_ssl, proxies={"http": None, "https": None}) as r: with requests.get(url, headers=headers, stream=True, verify=verify_ssl, proxies={"http": None, "https": None}, timeout=30) as r:
r.raise_for_status() r.raise_for_status()
total_bytes = int(r.headers.get('Content-Length', 0)) total_bytes = int(r.headers.get('Content-Length', 0))
print(f" HTTP {r.status_code}, Content-Length: {total_bytes} bytes") print(f" HTTP {r.status_code}, Content-Length: {total_bytes} bytes")
@ -370,7 +375,8 @@ def download_disk_changed_ranges(host, dc_name, ds_name, ds_path, extents,
with requests.get(url, headers=req_headers, stream=True, with requests.get(url, headers=req_headers, stream=True,
verify=verify_ssl, verify=verify_ssl,
proxies={"http": None, "https": None}) as r: proxies={"http": None, "https": None},
timeout=30) as r:
if r.status_code not in (200, 206): if r.status_code not in (200, 206):
raise Exception(f"HTTP {r.status_code} for Range {range_header}") raise Exception(f"HTTP {r.status_code} for Range {range_header}")
@ -636,7 +642,7 @@ def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ss
# Get VMDK paths and normalize them (strip snapshot suffixes like -000001) # Get VMDK paths and normalize them (strip snapshot suffixes like -000001)
# so we always request the base VMDKs which vCenter streams as the full data disk # so we always request the base VMDKs which vCenter streams as the full data disk
raw_vmdk_refs = vm_disk_vmdk_paths(vm) raw_vmdk_refs = vm_disk_vmdk_paths(vm)
vmdk_refs = [re.sub(r'-\d+\.vmdk$', '.vmdk', r, flags=re.IGNORECASE) for r in raw_vmdk_refs] vmdk_refs = [re.sub(r'-\d{6}\.vmdk$', '.vmdk', r, flags=re.IGNORECASE) for r in raw_vmdk_refs]
vmx_ref = vm_config_vmx_path(vm) vmx_ref = vm_config_vmx_path(vm)
# Build a map of normalized vmdk_ref -> VirtualDisk device for CBT # Build a map of normalized vmdk_ref -> VirtualDisk device for CBT
@ -645,7 +651,7 @@ def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ss
if isinstance(dev, vim.vm.device.VirtualDisk): if isinstance(dev, vim.vm.device.VirtualDisk):
fn = getattr(dev.backing, 'fileName', None) fn = getattr(dev.backing, 'fileName', None)
if fn: if fn:
norm = re.sub(r'-\d+\.vmdk$', '.vmdk', fn, flags=re.IGNORECASE) norm = re.sub(r'-\d{6}\.vmdk$', '.vmdk', fn, flags=re.IGNORECASE)
disk_devices[norm] = dev disk_devices[norm] = dev
# Locate the backup snapshot object for CBT queries # Locate the backup snapshot object for CBT queries
@ -664,7 +670,7 @@ def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ss
# Apply disk filter — only download selected VMDKs # Apply disk filter — only download selected VMDKs
if disk_filter is not None: if disk_filter is not None:
disk_filter_set = {re.sub(r'-\d+\.vmdk$', '.vmdk', f, flags=re.IGNORECASE) for f in disk_filter} disk_filter_set = {re.sub(r'-\d{6}\.vmdk$', '.vmdk', f, flags=re.IGNORECASE) for f in disk_filter}
skipped = [] skipped = []
filtered_vmdk_refs = [] filtered_vmdk_refs = []
for raw_ref, norm_ref in zip(raw_vmdk_refs, vmdk_refs): for raw_ref, norm_ref in zip(raw_vmdk_refs, vmdk_refs):

View File

@ -10,6 +10,7 @@ import time
import platform import platform
import subprocess import subprocess
import json import json
import sqlite3
from datetime import datetime from datetime import datetime
from functools import wraps from functools import wraps
from pathlib import Path from pathlib import Path
@ -43,32 +44,129 @@ JOBS_DIR = BASE_DIR / 'jobs'
JOBS_DIR.mkdir(exist_ok=True) JOBS_DIR.mkdir(exist_ok=True)
JOBS_DB_PATH = BASE_DIR / 'jobs.json' JOBS_DB_PATH = BASE_DIR / 'jobs.json'
jobs_db_lock = threading.Lock() DB_PATH = BASE_DIR / 'jobs.db'
jobs_db_lock = threading.RLock()
# In-memory job store: {job_id: job_dict} # In-memory job store: {job_id: job_dict}
jobs: dict = {} jobs: dict = {}
def load_jobs_db(): def init_db():
global jobs conn = sqlite3.connect(DB_PATH)
try:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
started REAL,
status TEXT,
data TEXT
)
''')
conn.commit()
finally:
conn.close()
def migrate_old_json_db():
if JOBS_DB_PATH.exists(): if JOBS_DB_PATH.exists():
try: try:
with open(JOBS_DB_PATH, 'r', encoding='utf-8') as f: with open(JOBS_DB_PATH, 'r', encoding='utf-8') as f:
with jobs_db_lock: old_jobs = json.load(f)
jobs.clear() if old_jobs and isinstance(old_jobs, dict):
jobs.update(json.load(f)) init_db()
conn = sqlite3.connect(DB_PATH)
try:
cursor = conn.cursor()
for jid, info in old_jobs.items():
cursor.execute("SELECT 1 FROM jobs WHERE id = ?", (jid,))
if not cursor.fetchone():
cursor.execute(
"INSERT INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)",
(jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False))
)
conn.commit()
print(f"MIGRATION: Successfully migrated jobs from jobs.json to SQLite database.")
finally:
conn.close()
try:
bak_path = BASE_DIR / 'jobs.json.bak'
if bak_path.exists():
bak_path.unlink()
JOBS_DB_PATH.rename(bak_path)
except Exception:
pass
except Exception as e: except Exception as e:
print(f"ERROR: Failed to load jobs database: {e}", file=sys.stderr) print(f"WARNING: Migration of jobs.json failed: {e}", file=sys.stderr)
else:
def load_jobs_db():
global jobs
init_db()
migrate_old_json_db()
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id, data FROM jobs")
rows = cursor.fetchall()
with jobs_db_lock: with jobs_db_lock:
jobs.clear() jobs.clear()
for jid, data_str in rows:
try:
jobs[jid] = json.loads(data_str)
except Exception as e:
print(f"ERROR: Failed to parse job data for {jid}: {e}", file=sys.stderr)
# Clean up any jobs left in running/queued state across restart
updated_jobs = []
with jobs_db_lock:
for jid, info in jobs.items():
if info.get('status') in ('running', 'queued'):
info['status'] = 'failed (Interrupted by restart)'
info['progress'] = {
'pct': 100,
'phase': 'failed',
'detail': 'Job was interrupted by server restart.'
}
updated_jobs.append((jid, info))
if updated_jobs:
try:
cursor = conn.cursor()
for jid, info in updated_jobs:
cursor.execute(
"INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)",
(jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False))
)
conn.commit()
except Exception as e:
print(f"ERROR: Failed to update interrupted jobs in SQLite: {e}", file=sys.stderr)
conn.close()
except Exception as e:
print(f"ERROR: Failed to load SQLite database: {e}", file=sys.stderr)
def save_jobs_db(): def save_jobs_db():
with jobs_db_lock: with jobs_db_lock:
try: try:
with open(JOBS_DB_PATH, 'w', encoding='utf-8') as f: conn = sqlite3.connect(DB_PATH)
json.dump(jobs, f, indent=2, ensure_ascii=False) try:
cursor = conn.cursor()
# Remove deleted jobs from SQLite
if jobs:
placeholders = ','.join('?' for _ in jobs)
cursor.execute(f"DELETE FROM jobs WHERE id NOT IN ({placeholders})", list(jobs.keys()))
else:
cursor.execute("DELETE FROM jobs")
# Insert or replace active jobs
for jid, info in jobs.items():
cursor.execute(
"INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)",
(jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False))
)
conn.commit()
finally:
conn.close()
except Exception as e: except Exception as e:
print(f"ERROR: Failed to save jobs database: {e}", file=sys.stderr) print(f"ERROR: Failed to save jobs database to SQLite: {e}", file=sys.stderr)
# APScheduler instance # APScheduler instance
scheduler = None scheduler = None
@ -216,15 +314,17 @@ _bg_refresh_running: set = set()
def _start_bg_refresh(host, user, password, no_verify_ssl): def _start_bg_refresh(host, user, password, no_verify_ssl):
"""Kick off a background thread to refresh the cache if not already running.""" """Kick off a background thread to refresh the cache if not already running."""
key = _cache_key(host, user) key = _cache_key(host, user)
if key in _bg_refresh_running: with _vm_cache_lock:
return if key in _bg_refresh_running:
_bg_refresh_running.add(key) return
_bg_refresh_running.add(key)
def _worker(): def _worker():
try: try:
_fetch_and_cache(host, user, password, no_verify_ssl) _fetch_and_cache(host, user, password, no_verify_ssl)
finally: finally:
_bg_refresh_running.discard(key) with _vm_cache_lock:
_bg_refresh_running.discard(key)
t = threading.Thread(target=_worker, daemon=True) t = threading.Thread(target=_worker, daemon=True)
t.start() t.start()
@ -431,14 +531,18 @@ def enforce_retention_policy(info, log_path=None):
def run_job_thread(jid): def run_job_thread(jid):
"""Worker executed in a thread (and by APScheduler).""" """Worker executed in a thread (and by APScheduler)."""
info = jobs.get(jid) with jobs_db_lock:
if not info: info = jobs.get(jid)
return if not info:
info['status'] = 'running' return
info['started'] = time.time() info['status'] = 'running'
info['progress'] = {'pct': 0, 'phase': 'starting', 'detail': 'Initializing…'} info['started'] = time.time()
info['progress'] = {'pct': 0, 'phase': 'starting', 'detail': 'Initializing…'}
save_jobs_db()
is_cancelled = lambda: jobs.get(jid, {}).get('status') == 'cancelling' def is_cancelled():
with jobs_db_lock:
return jobs.get(jid, {}).get('status') == 'cancelling'
vm_names = info.get('vm_names') vm_names = info.get('vm_names')
log_path = str(JOBS_DIR / jid / 'backup.log') log_path = str(JOBS_DIR / jid / 'backup.log')
@ -446,15 +550,17 @@ def run_job_thread(jid):
if vm_names: if vm_names:
# Grouped/Batch VM backup run # Grouped/Batch VM backup run
total_vms = len(vm_names) total_vms = len(vm_names)
info['run_dest'] = os.path.join(info['dest'], f"batch-{datetime.fromtimestamp(info['started']).strftime('%Y%m%d%H%M%S')}") with jobs_db_lock:
save_jobs_db() info['run_dest'] = os.path.join(info['dest'], f"batch-{datetime.fromtimestamp(info['started']).strftime('%Y%m%d%H%M%S')}")
save_jobs_db()
success_vms = [] success_vms = []
failed_vms = [] failed_vms = []
for idx, vm in enumerate(vm_names): for idx, vm in enumerate(vm_names):
if is_cancelled(): if is_cancelled():
failed_vms.append((vm, "Cancelled by user")) with jobs_db_lock:
failed_vms.append((vm, "Cancelled by user"))
with open(log_path, 'a', encoding='utf-8') as f: with open(log_path, 'a', encoding='utf-8') as f:
f.write(f"\nSkipping VM {idx+1}/{total_vms} ({vm}): Backup cancelled by user\n") f.write(f"\nSkipping VM {idx+1}/{total_vms} ({vm}): Backup cancelled by user\n")
continue continue
@ -466,11 +572,12 @@ def run_job_thread(jid):
def _cb(prog): def _cb(prog):
prog_pct = prog.get('pct', 0) prog_pct = prog.get('pct', 0)
overall_pct = start_p + int((prog_pct / 100) * (end_p - start_p)) overall_pct = start_p + int((prog_pct / 100) * (end_p - start_p))
info['progress'] = { with jobs_db_lock:
'pct': overall_pct, info['progress'] = {
'phase': f'vm {vm_idx+1}/{total} ({vm_n})', 'pct': overall_pct,
'detail': f"[{vm_n}] {prog.get('phase', '')}: {prog.get('detail', '')}" 'phase': f'vm {vm_idx+1}/{total} ({vm_n})',
} 'detail': f"[{vm_n}] {prog.get('phase', '')}: {prog.get('detail', '')}"
}
return _cb return _cb
try: try:
@ -506,16 +613,20 @@ def run_job_thread(jid):
is_cancelled_cb=is_cancelled, is_cancelled_cb=is_cancelled,
use_cbt=info.get('use_cbt', False), use_cbt=info.get('use_cbt', False),
) )
success_vms.append(vm) with jobs_db_lock:
success_vms.append(vm)
except Exception as e: except Exception as e:
if "cancelled by user" in str(e).lower(): is_cancel_err = "cancelled by user" in str(e).lower()
failed_vms.append((vm, "Cancelled by user")) if is_cancel_err:
info['status'] = 'failed (Cancelled)' with jobs_db_lock:
info['progress'] = {'pct': 100, 'phase': 'failed', 'detail': 'Backup cancelled by user'} failed_vms.append((vm, "Cancelled by user"))
save_jobs_db() info['status'] = 'failed (Cancelled)'
info['progress'] = {'pct': 100, 'phase': 'failed', 'detail': 'Backup cancelled by user'}
save_jobs_db()
break break
else: else:
failed_vms.append((vm, str(e))) with jobs_db_lock:
failed_vms.append((vm, str(e)))
with open(log_path, 'a', encoding='utf-8') as f: with open(log_path, 'a', encoding='utf-8') as f:
f.write(f"\nERROR backing up VM {vm}: {e}\n\n") f.write(f"\nERROR backing up VM {vm}: {e}\n\n")
finally: finally:
@ -528,30 +639,33 @@ def run_job_thread(jid):
} }
enforce_retention_policy(vm_info, log_path=log_path) enforce_retention_policy(vm_info, log_path=log_path)
if failed_vms: with jobs_db_lock:
if success_vms: if failed_vms:
info['status'] = f"finished with errors (Failed: {', '.join([f[0] for f in failed_vms])})" if success_vms:
info['status'] = f"finished with errors (Failed: {', '.join([f[0] for f in failed_vms])})"
else:
info['status'] = f"failed (All backups failed)"
else: else:
info['status'] = f"failed (All backups failed)" info['status'] = 'finished'
else:
info['status'] = 'finished'
info['progress'] = { info['progress'] = {
'pct': 100, 'pct': 100,
'phase': 'done', 'phase': 'done',
'detail': f"Batch completed. Success: {len(success_vms)}, Failed: {len(failed_vms)}" 'detail': f"Batch completed. Success: {len(success_vms)}, Failed: {len(failed_vms)}"
} }
save_jobs_db() save_jobs_db()
else: else:
# Single VM backup run (original behavior) # Single VM backup run (original behavior)
run_timestamp = datetime.fromtimestamp(info['started']).strftime('%Y%m%d%H%M%S') run_timestamp = datetime.fromtimestamp(info['started']).strftime('%Y%m%d%H%M%S')
run_dest = os.path.join(info['dest'], info['vm_name'], f"backup-{run_timestamp}") run_dest = os.path.join(info['dest'], info['vm_name'], f"backup-{run_timestamp}")
info['run_dest'] = run_dest with jobs_db_lock:
save_jobs_db() info['run_dest'] = run_dest
save_jobs_db()
def progress_cb(prog): def progress_cb(prog):
info['progress'] = prog with jobs_db_lock:
info['progress'] = prog
try: try:
run_backup( run_backup(
@ -573,16 +687,18 @@ def run_job_thread(jid):
is_cancelled_cb=is_cancelled, is_cancelled_cb=is_cancelled,
use_cbt=info.get('use_cbt', False), use_cbt=info.get('use_cbt', False),
) )
info['status'] = 'finished' with jobs_db_lock:
info['progress'] = {'pct': 100, 'phase': 'done', 'detail': 'Backup completed successfully'} info['status'] = 'finished'
save_jobs_db() info['progress'] = {'pct': 100, 'phase': 'done', 'detail': 'Backup completed successfully'}
save_jobs_db()
except Exception as e: except Exception as e:
if "cancelled by user" in str(e).lower(): with jobs_db_lock:
info['status'] = 'failed (Cancelled)' if "cancelled by user" in str(e).lower():
info['progress'] = {'pct': 100, 'phase': 'failed', 'detail': 'Backup cancelled by user'} info['status'] = 'failed (Cancelled)'
else: info['progress'] = {'pct': 100, 'phase': 'failed', 'detail': 'Backup cancelled by user'}
info['status'] = f'failed ({e})' else:
save_jobs_db() info['status'] = f'failed ({e})'
save_jobs_db()
finally: finally:
# Always enforce retention policy (which cleans up failed folders immediately) # Always enforce retention policy (which cleans up failed folders immediately)
enforce_retention_policy(info, log_path=log_path) enforce_retention_policy(info, log_path=log_path)
@ -633,22 +749,23 @@ def create_and_start_job(
'retention_value': retention_value, 'retention_value': retention_value,
'use_cbt': use_cbt, 'use_cbt': use_cbt,
} }
jobs[jid] = info with jobs_db_lock:
jobs[jid] = info
if schedule_type == 'now' or not HAS_SCHEDULER: if schedule_type == 'now' or not HAS_SCHEDULER:
t = threading.Thread(target=run_job_thread, args=(jid,), daemon=True)
t.start()
else:
sched_id = register_scheduler_job(info)
if sched_id:
info['schedule_id'] = sched_id
info['status'] = 'scheduled'
else:
# Fallback: run now
t = threading.Thread(target=run_job_thread, args=(jid,), daemon=True) t = threading.Thread(target=run_job_thread, args=(jid,), daemon=True)
t.start() t.start()
else:
sched_id = register_scheduler_job(info)
if sched_id:
info['schedule_id'] = sched_id
info['status'] = 'scheduled'
else:
# Fallback: run now
t = threading.Thread(target=run_job_thread, args=(jid,), daemon=True)
t.start()
save_jobs_db() save_jobs_db()
return jid return jid
@ -961,10 +1078,12 @@ def batch_jobs():
@app.route('/jobs') @app.route('/jobs')
@login_required @login_required
def list_jobs(): def list_jobs():
job_list = [ with jobs_db_lock:
job_to_display(jid, info) sorted_items = sorted(jobs.items(), key=lambda x: x[1].get('started', 0), reverse=True)
for jid, info in sorted(jobs.items(), key=lambda x: x[1].get('started', 0), reverse=True) job_list = [
] job_to_display(jid, info)
for jid, info in sorted_items
]
scheduled_count = sum(1 for j in job_list if j['schedule_id']) scheduled_count = sum(1 for j in job_list if j['schedule_id'])
return render_template('jobs.html', jobs=job_list, scheduled_count=scheduled_count) return render_template('jobs.html', jobs=job_list, scheduled_count=scheduled_count)
@ -974,16 +1093,19 @@ def list_jobs():
@app.route('/job/<jobid>') @app.route('/job/<jobid>')
@login_required @login_required
def job_detail(jobid): def job_detail(jobid):
info = jobs.get(jobid) with jobs_db_lock:
if not info: info = jobs.get(jobid)
abort(404) if not info:
return render_template('job_detail.html', job=job_to_display(jobid, info)) abort(404)
job_disp = job_to_display(jobid, info)
return render_template('job_detail.html', job=job_disp)
@app.route('/job/<jobid>/log') @app.route('/job/<jobid>/log')
@login_required @login_required
def job_log(jobid): def job_log(jobid):
info = jobs.get(jobid) with jobs_db_lock:
info = jobs.get(jobid)
if not info: if not info:
abort(404) abort(404)
log_path = JOBS_DIR / jobid / 'backup.log' log_path = JOBS_DIR / jobid / 'backup.log'
@ -997,31 +1119,35 @@ def job_log(jobid):
@app.route('/api/job/<jobid>/status') @app.route('/api/job/<jobid>/status')
@login_required @login_required
def api_job_status(jobid): def api_job_status(jobid):
info = jobs.get(jobid) with jobs_db_lock:
if not info: info = jobs.get(jobid)
return jsonify({'error': 'not found'}), 404 if not info:
return jsonify({'error': 'not found'}), 404
status = info.get('status', 'unknown')
progress = info.get('progress', {'pct': 0, 'phase': '', 'detail': ''})
return jsonify({ return jsonify({
'status': info.get('status', 'unknown'), 'status': status,
'id': jobid, 'id': jobid,
'progress': info.get('progress', {'pct': 0, 'phase': '', 'detail': ''}), 'progress': progress,
}) })
@app.route('/job/<jobid>/cancel-schedule', methods=['POST']) @app.route('/job/<jobid>/cancel-schedule', methods=['POST'])
@login_required @login_required
def cancel_schedule(jobid): def cancel_schedule(jobid):
info = jobs.get(jobid) with jobs_db_lock:
if not info: info = jobs.get(jobid)
abort(404) if not info:
sched_id = info.get('schedule_id') abort(404)
if sched_id and scheduler: sched_id = info.get('schedule_id')
try: if sched_id and scheduler:
scheduler.remove_job(sched_id) try:
except Exception: scheduler.remove_job(sched_id)
pass except Exception:
info['schedule_id'] = None pass
info['status'] = info.get('status', 'finished') if info.get('status') not in ('queued', 'running') else info['status'] info['schedule_id'] = None
save_jobs_db() info['status'] = info.get('status', 'finished') if info.get('status') not in ('queued', 'running') else info['status']
save_jobs_db()
flash('Recurring schedule cancelled.', 'success') flash('Recurring schedule cancelled.', 'success')
return redirect(url_for('job_detail', jobid=jobid)) return redirect(url_for('job_detail', jobid=jobid))
@ -1029,36 +1155,42 @@ def cancel_schedule(jobid):
@app.route('/job/<jobid>/reactivate-schedule', methods=['POST']) @app.route('/job/<jobid>/reactivate-schedule', methods=['POST'])
@login_required @login_required
def reactivate_schedule(jobid): def reactivate_schedule(jobid):
info = jobs.get(jobid) with jobs_db_lock:
if not info: info = jobs.get(jobid)
abort(404) if not info:
if not info.get('schedule_type') or info.get('schedule_type') == 'now': abort(404)
flash('This job does not have a recurring schedule configured.', 'danger') if not info.get('schedule_type') or info.get('schedule_type') == 'now':
return redirect(url_for('job_detail', jobid=jobid)) flash('This job does not have a recurring schedule configured.', 'danger')
if info.get('schedule_id'): return redirect(url_for('job_detail', jobid=jobid))
flash('Schedule is already active.', 'warning') if info.get('schedule_id'):
return redirect(url_for('job_detail', jobid=jobid)) flash('Schedule is already active.', 'warning')
sched_id = register_scheduler_job(info) return redirect(url_for('job_detail', jobid=jobid))
if sched_id: sched_id = register_scheduler_job(info)
info['schedule_id'] = sched_id if sched_id:
if info.get('status') not in ('running', 'queued'): info['schedule_id'] = sched_id
info['status'] = 'scheduled' if info.get('status') not in ('running', 'queued'):
save_jobs_db() info['status'] = 'scheduled'
flash('Recurring schedule reactivated successfully.', 'success') save_jobs_db()
else: flash('Recurring schedule reactivated successfully.', 'success')
flash('Failed to reactivate schedule.', 'danger') else:
flash('Failed to reactivate schedule.', 'danger')
return redirect(url_for('job_detail', jobid=jobid)) return redirect(url_for('job_detail', jobid=jobid))
@app.route('/job/<jobid>/run', methods=['POST']) @app.route('/job/<jobid>/run', methods=['POST'])
@login_required @login_required
def run_job_now(jobid): def run_job_now(jobid):
info = jobs.get(jobid) with jobs_db_lock:
if not info: info = jobs.get(jobid)
abort(404) if not info:
if info.get('status') in ('running', 'queued'): abort(404)
flash('Backup is already running or queued.', 'warning') if info.get('status') in ('running', 'queued'):
return redirect(url_for('job_detail', jobid=jobid)) flash('Backup is already running or queued.', 'warning')
return redirect(url_for('job_detail', jobid=jobid))
# Mark status as queued atomically to prevent double run race condition
info['status'] = 'queued'
save_jobs_db()
# Start backup execution in a background thread # Start backup execution in a background thread
t = threading.Thread(target=run_job_thread, args=(jobid,), daemon=True) t = threading.Thread(target=run_job_thread, args=(jobid,), daemon=True)
@ -1070,38 +1202,39 @@ def run_job_now(jobid):
@app.route('/job/<jobid>/stop', methods=['POST']) @app.route('/job/<jobid>/stop', methods=['POST'])
@login_required @login_required
def stop_job(jobid): def stop_job(jobid):
info = jobs.get(jobid) with jobs_db_lock:
if not info: info = jobs.get(jobid)
abort(404) if not info:
if info.get('status') in ('running', 'queued'): abort(404)
info['status'] = 'cancelling' if info.get('status') in ('running', 'queued'):
info['progress'] = {'pct': info.get('progress', {}).get('pct', 0), 'phase': 'cancelling', 'detail': 'Stopping backup execution…'} info['status'] = 'cancelling'
save_jobs_db() info['progress'] = {'pct': info.get('progress', {}).get('pct', 0), 'phase': 'cancelling', 'detail': 'Stopping backup execution…'}
flash('Request to stop backup sent.', 'info') save_jobs_db()
else: flash('Request to stop backup sent.', 'info')
flash('Job is not running or queued.', 'warning') else:
flash('Job is not running or queued.', 'warning')
return redirect(url_for('job_detail', jobid=jobid)) return redirect(url_for('job_detail', jobid=jobid))
@app.route('/job/<jobid>/delete', methods=['POST']) @app.route('/job/<jobid>/delete', methods=['POST'])
@login_required @login_required
def delete_job(jobid): def delete_job(jobid):
info = jobs.get(jobid)
if not info:
abort(404)
# Cancel schedule first if it exists
sched_id = info.get('schedule_id')
if sched_id and scheduler:
try:
scheduler.remove_job(sched_id)
except Exception:
pass
# Remove from jobs dict
with jobs_db_lock: with jobs_db_lock:
info = jobs.get(jobid)
if not info:
abort(404)
# Cancel schedule first if it exists
sched_id = info.get('schedule_id')
if sched_id and scheduler:
try:
scheduler.remove_job(sched_id)
except Exception:
pass
# Remove from jobs dict
jobs.pop(jobid, None) jobs.pop(jobid, None)
save_jobs_db() save_jobs_db()
# Remove the job directory containing the log file # Remove the job directory containing the log file
import shutil import shutil

View File

@ -70,14 +70,19 @@ def find_vm_by_name(content, vm_name):
def wait_for_task(task, action_name='job'): def wait_for_task(task, action_name='job'):
while task.info.state not in (vim.TaskInfo.State.success, vim.TaskInfo.State.error): while True:
info = getattr(task, 'info', None)
if info and info.state in (vim.TaskInfo.State.success, vim.TaskInfo.State.error):
break
time.sleep(1) time.sleep(1)
if task.info.state == vim.TaskInfo.State.success: info = task.info
return task.info.result if info.state == vim.TaskInfo.State.success:
return info.result
else: else:
err = task.info.error err = info.error
fault_name = err.__class__.__name__ if err else "UnknownFault"
err_msg = getattr(err, 'msg', None) or str(err) err_msg = getattr(err, 'msg', None) or str(err)
raise Exception(f"{action_name} did not complete successfully: {err_msg}") raise Exception(f"{action_name} did not complete successfully: {fault_name}: {err_msg}")
def create_snapshot(vm, snap_name, desc="backup snapshot", memory=False, quiesce=False): def create_snapshot(vm, snap_name, desc="backup snapshot", memory=False, quiesce=False):