From b99755771ef8eb34054ead62ff99e728e026b78d Mon Sep 17 00:00:00 2001 From: Rizqi Date: Tue, 23 Jun 2026 00:31:59 +0700 Subject: [PATCH] refactor: migrate jobs storage from JSON file to SQLite and add thread-safe access to job and cache states --- backup_core.py | 26 +-- gui_app.py | 429 ++++++++++++++++++++++++++++++---------------- vsphere_backup.py | 15 +- 3 files changed, 307 insertions(+), 163 deletions(-) diff --git a/backup_core.py b/backup_core.py index 045faa2..ef8b9dc 100644 --- a/backup_core.py +++ b/backup_core.py @@ -115,14 +115,19 @@ def list_vms(host, user, password, no_verify_ssl=False): def wait_for_task(task, action_name='job'): - while task.info.state not in (vim.TaskInfo.State.success, vim.TaskInfo.State.error): + while True: + info = getattr(task, 'info', None) + if info and info.state in (vim.TaskInfo.State.success, vim.TaskInfo.State.error): + break time.sleep(1) - if task.info.state == vim.TaskInfo.State.success: - return task.info.result + info = task.info + if info.state == vim.TaskInfo.State.success: + return info.result else: - err = task.info.error + err = info.error + fault_name = err.__class__.__name__ if err else "UnknownFault" err_msg = getattr(err, 'msg', None) or str(err) - raise Exception(f"{action_name} did not complete successfully: {err_msg}") + raise Exception(f"{action_name} did not complete successfully: {fault_name}: {err_msg}") def create_snapshot(vm, snap_name, desc="backup snapshot", memory=False, quiesce=False): @@ -152,7 +157,7 @@ def download_datastore_file(host, dc_name, datastore_name, ds_path, local_path, print(f"Downloading {ds_path} from datastore {datastore_name} to {local_path}") print(f" URL: {url}") sha256 = hashlib.sha256() - with requests.get(url, headers=headers, stream=True, verify=verify_ssl, proxies={"http": None, "https": None}) as r: + with requests.get(url, headers=headers, stream=True, verify=verify_ssl, proxies={"http": None, "https": None}, timeout=30) as r: r.raise_for_status() total_bytes = int(r.headers.get('Content-Length', 0)) print(f" HTTP {r.status_code}, Content-Length: {total_bytes} bytes") @@ -370,7 +375,8 @@ def download_disk_changed_ranges(host, dc_name, ds_name, ds_path, extents, with requests.get(url, headers=req_headers, stream=True, verify=verify_ssl, - proxies={"http": None, "https": None}) as r: + proxies={"http": None, "https": None}, + timeout=30) as r: if r.status_code not in (200, 206): raise Exception(f"HTTP {r.status_code} for Range {range_header}") @@ -636,7 +642,7 @@ def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ss # Get VMDK paths and normalize them (strip snapshot suffixes like -000001) # so we always request the base VMDKs which vCenter streams as the full data disk raw_vmdk_refs = vm_disk_vmdk_paths(vm) - vmdk_refs = [re.sub(r'-\d+\.vmdk$', '.vmdk', r, flags=re.IGNORECASE) for r in raw_vmdk_refs] + vmdk_refs = [re.sub(r'-\d{6}\.vmdk$', '.vmdk', r, flags=re.IGNORECASE) for r in raw_vmdk_refs] vmx_ref = vm_config_vmx_path(vm) # Build a map of normalized vmdk_ref -> VirtualDisk device for CBT @@ -645,7 +651,7 @@ def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ss if isinstance(dev, vim.vm.device.VirtualDisk): fn = getattr(dev.backing, 'fileName', None) if fn: - norm = re.sub(r'-\d+\.vmdk$', '.vmdk', fn, flags=re.IGNORECASE) + norm = re.sub(r'-\d{6}\.vmdk$', '.vmdk', fn, flags=re.IGNORECASE) disk_devices[norm] = dev # Locate the backup snapshot object for CBT queries @@ -664,7 +670,7 @@ def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ss # Apply disk filter — only download selected VMDKs if disk_filter is not None: - disk_filter_set = {re.sub(r'-\d+\.vmdk$', '.vmdk', f, flags=re.IGNORECASE) for f in disk_filter} + disk_filter_set = {re.sub(r'-\d{6}\.vmdk$', '.vmdk', f, flags=re.IGNORECASE) for f in disk_filter} skipped = [] filtered_vmdk_refs = [] for raw_ref, norm_ref in zip(raw_vmdk_refs, vmdk_refs): diff --git a/gui_app.py b/gui_app.py index a6e29f9..b371e61 100644 --- a/gui_app.py +++ b/gui_app.py @@ -10,6 +10,7 @@ import time import platform import subprocess import json +import sqlite3 from datetime import datetime from functools import wraps from pathlib import Path @@ -43,32 +44,129 @@ JOBS_DIR = BASE_DIR / 'jobs' JOBS_DIR.mkdir(exist_ok=True) JOBS_DB_PATH = BASE_DIR / 'jobs.json' -jobs_db_lock = threading.Lock() +DB_PATH = BASE_DIR / 'jobs.db' +jobs_db_lock = threading.RLock() # In-memory job store: {job_id: job_dict} jobs: dict = {} -def load_jobs_db(): - global jobs +def init_db(): + conn = sqlite3.connect(DB_PATH) + try: + cursor = conn.cursor() + cursor.execute(''' + CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + started REAL, + status TEXT, + data TEXT + ) + ''') + conn.commit() + finally: + conn.close() + +def migrate_old_json_db(): if JOBS_DB_PATH.exists(): try: with open(JOBS_DB_PATH, 'r', encoding='utf-8') as f: - with jobs_db_lock: - jobs.clear() - jobs.update(json.load(f)) + old_jobs = json.load(f) + if old_jobs and isinstance(old_jobs, dict): + init_db() + conn = sqlite3.connect(DB_PATH) + try: + cursor = conn.cursor() + for jid, info in old_jobs.items(): + cursor.execute("SELECT 1 FROM jobs WHERE id = ?", (jid,)) + if not cursor.fetchone(): + cursor.execute( + "INSERT INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)", + (jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False)) + ) + conn.commit() + print(f"MIGRATION: Successfully migrated jobs from jobs.json to SQLite database.") + finally: + conn.close() + try: + bak_path = BASE_DIR / 'jobs.json.bak' + if bak_path.exists(): + bak_path.unlink() + JOBS_DB_PATH.rename(bak_path) + except Exception: + pass except Exception as e: - print(f"ERROR: Failed to load jobs database: {e}", file=sys.stderr) - else: + print(f"WARNING: Migration of jobs.json failed: {e}", file=sys.stderr) + +def load_jobs_db(): + global jobs + init_db() + migrate_old_json_db() + try: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT id, data FROM jobs") + rows = cursor.fetchall() + with jobs_db_lock: jobs.clear() + for jid, data_str in rows: + try: + jobs[jid] = json.loads(data_str) + except Exception as e: + print(f"ERROR: Failed to parse job data for {jid}: {e}", file=sys.stderr) + + # Clean up any jobs left in running/queued state across restart + updated_jobs = [] + with jobs_db_lock: + for jid, info in jobs.items(): + if info.get('status') in ('running', 'queued'): + info['status'] = 'failed (Interrupted by restart)' + info['progress'] = { + 'pct': 100, + 'phase': 'failed', + 'detail': 'Job was interrupted by server restart.' + } + updated_jobs.append((jid, info)) + + if updated_jobs: + try: + cursor = conn.cursor() + for jid, info in updated_jobs: + cursor.execute( + "INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)", + (jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False)) + ) + conn.commit() + except Exception as e: + print(f"ERROR: Failed to update interrupted jobs in SQLite: {e}", file=sys.stderr) + conn.close() + except Exception as e: + print(f"ERROR: Failed to load SQLite database: {e}", file=sys.stderr) def save_jobs_db(): with jobs_db_lock: try: - with open(JOBS_DB_PATH, 'w', encoding='utf-8') as f: - json.dump(jobs, f, indent=2, ensure_ascii=False) + conn = sqlite3.connect(DB_PATH) + try: + cursor = conn.cursor() + # Remove deleted jobs from SQLite + if jobs: + placeholders = ','.join('?' for _ in jobs) + cursor.execute(f"DELETE FROM jobs WHERE id NOT IN ({placeholders})", list(jobs.keys())) + else: + cursor.execute("DELETE FROM jobs") + + # Insert or replace active jobs + for jid, info in jobs.items(): + cursor.execute( + "INSERT OR REPLACE INTO jobs (id, started, status, data) VALUES (?, ?, ?, ?)", + (jid, info.get('started', 0), info.get('status', ''), json.dumps(info, ensure_ascii=False)) + ) + conn.commit() + finally: + conn.close() except Exception as e: - print(f"ERROR: Failed to save jobs database: {e}", file=sys.stderr) + print(f"ERROR: Failed to save jobs database to SQLite: {e}", file=sys.stderr) # APScheduler instance scheduler = None @@ -216,15 +314,17 @@ _bg_refresh_running: set = set() def _start_bg_refresh(host, user, password, no_verify_ssl): """Kick off a background thread to refresh the cache if not already running.""" key = _cache_key(host, user) - if key in _bg_refresh_running: - return - _bg_refresh_running.add(key) + with _vm_cache_lock: + if key in _bg_refresh_running: + return + _bg_refresh_running.add(key) def _worker(): try: _fetch_and_cache(host, user, password, no_verify_ssl) finally: - _bg_refresh_running.discard(key) + with _vm_cache_lock: + _bg_refresh_running.discard(key) t = threading.Thread(target=_worker, daemon=True) t.start() @@ -431,14 +531,18 @@ def enforce_retention_policy(info, log_path=None): def run_job_thread(jid): """Worker executed in a thread (and by APScheduler).""" - info = jobs.get(jid) - if not info: - return - info['status'] = 'running' - info['started'] = time.time() - info['progress'] = {'pct': 0, 'phase': 'starting', 'detail': 'Initializing…'} + with jobs_db_lock: + info = jobs.get(jid) + if not info: + return + info['status'] = 'running' + info['started'] = time.time() + info['progress'] = {'pct': 0, 'phase': 'starting', 'detail': 'Initializing…'} + save_jobs_db() - is_cancelled = lambda: jobs.get(jid, {}).get('status') == 'cancelling' + def is_cancelled(): + with jobs_db_lock: + return jobs.get(jid, {}).get('status') == 'cancelling' vm_names = info.get('vm_names') log_path = str(JOBS_DIR / jid / 'backup.log') @@ -446,15 +550,17 @@ def run_job_thread(jid): if vm_names: # Grouped/Batch VM backup run total_vms = len(vm_names) - info['run_dest'] = os.path.join(info['dest'], f"batch-{datetime.fromtimestamp(info['started']).strftime('%Y%m%d%H%M%S')}") - save_jobs_db() + with jobs_db_lock: + info['run_dest'] = os.path.join(info['dest'], f"batch-{datetime.fromtimestamp(info['started']).strftime('%Y%m%d%H%M%S')}") + save_jobs_db() success_vms = [] failed_vms = [] for idx, vm in enumerate(vm_names): if is_cancelled(): - failed_vms.append((vm, "Cancelled by user")) + with jobs_db_lock: + failed_vms.append((vm, "Cancelled by user")) with open(log_path, 'a', encoding='utf-8') as f: f.write(f"\nSkipping VM {idx+1}/{total_vms} ({vm}): Backup cancelled by user\n") continue @@ -466,11 +572,12 @@ def run_job_thread(jid): def _cb(prog): prog_pct = prog.get('pct', 0) overall_pct = start_p + int((prog_pct / 100) * (end_p - start_p)) - info['progress'] = { - 'pct': overall_pct, - 'phase': f'vm {vm_idx+1}/{total} ({vm_n})', - 'detail': f"[{vm_n}] {prog.get('phase', '')}: {prog.get('detail', '')}" - } + with jobs_db_lock: + info['progress'] = { + 'pct': overall_pct, + 'phase': f'vm {vm_idx+1}/{total} ({vm_n})', + 'detail': f"[{vm_n}] {prog.get('phase', '')}: {prog.get('detail', '')}" + } return _cb try: @@ -506,16 +613,20 @@ def run_job_thread(jid): is_cancelled_cb=is_cancelled, use_cbt=info.get('use_cbt', False), ) - success_vms.append(vm) + with jobs_db_lock: + success_vms.append(vm) except Exception as e: - if "cancelled by user" in str(e).lower(): - failed_vms.append((vm, "Cancelled by user")) - info['status'] = 'failed (Cancelled)' - info['progress'] = {'pct': 100, 'phase': 'failed', 'detail': 'Backup cancelled by user'} - save_jobs_db() + is_cancel_err = "cancelled by user" in str(e).lower() + if is_cancel_err: + with jobs_db_lock: + failed_vms.append((vm, "Cancelled by user")) + info['status'] = 'failed (Cancelled)' + info['progress'] = {'pct': 100, 'phase': 'failed', 'detail': 'Backup cancelled by user'} + save_jobs_db() break else: - failed_vms.append((vm, str(e))) + with jobs_db_lock: + failed_vms.append((vm, str(e))) with open(log_path, 'a', encoding='utf-8') as f: f.write(f"\nERROR backing up VM {vm}: {e}\n\n") finally: @@ -528,30 +639,33 @@ def run_job_thread(jid): } enforce_retention_policy(vm_info, log_path=log_path) - if failed_vms: - if success_vms: - info['status'] = f"finished with errors (Failed: {', '.join([f[0] for f in failed_vms])})" + with jobs_db_lock: + if failed_vms: + if success_vms: + info['status'] = f"finished with errors (Failed: {', '.join([f[0] for f in failed_vms])})" + else: + info['status'] = f"failed (All backups failed)" else: - info['status'] = f"failed (All backups failed)" - else: - info['status'] = 'finished' - - info['progress'] = { - 'pct': 100, - 'phase': 'done', - 'detail': f"Batch completed. Success: {len(success_vms)}, Failed: {len(failed_vms)}" - } - save_jobs_db() + info['status'] = 'finished' + + info['progress'] = { + 'pct': 100, + 'phase': 'done', + 'detail': f"Batch completed. Success: {len(success_vms)}, Failed: {len(failed_vms)}" + } + save_jobs_db() else: # Single VM backup run (original behavior) run_timestamp = datetime.fromtimestamp(info['started']).strftime('%Y%m%d%H%M%S') run_dest = os.path.join(info['dest'], info['vm_name'], f"backup-{run_timestamp}") - info['run_dest'] = run_dest - save_jobs_db() + with jobs_db_lock: + info['run_dest'] = run_dest + save_jobs_db() def progress_cb(prog): - info['progress'] = prog + with jobs_db_lock: + info['progress'] = prog try: run_backup( @@ -573,16 +687,18 @@ def run_job_thread(jid): is_cancelled_cb=is_cancelled, use_cbt=info.get('use_cbt', False), ) - info['status'] = 'finished' - info['progress'] = {'pct': 100, 'phase': 'done', 'detail': 'Backup completed successfully'} - save_jobs_db() + with jobs_db_lock: + info['status'] = 'finished' + info['progress'] = {'pct': 100, 'phase': 'done', 'detail': 'Backup completed successfully'} + save_jobs_db() except Exception as e: - if "cancelled by user" in str(e).lower(): - info['status'] = 'failed (Cancelled)' - info['progress'] = {'pct': 100, 'phase': 'failed', 'detail': 'Backup cancelled by user'} - else: - info['status'] = f'failed ({e})' - save_jobs_db() + with jobs_db_lock: + if "cancelled by user" in str(e).lower(): + info['status'] = 'failed (Cancelled)' + info['progress'] = {'pct': 100, 'phase': 'failed', 'detail': 'Backup cancelled by user'} + else: + info['status'] = f'failed ({e})' + save_jobs_db() finally: # Always enforce retention policy (which cleans up failed folders immediately) enforce_retention_policy(info, log_path=log_path) @@ -633,22 +749,23 @@ def create_and_start_job( 'retention_value': retention_value, 'use_cbt': use_cbt, } - jobs[jid] = info + with jobs_db_lock: + jobs[jid] = info - if schedule_type == 'now' or not HAS_SCHEDULER: - t = threading.Thread(target=run_job_thread, args=(jid,), daemon=True) - t.start() - else: - sched_id = register_scheduler_job(info) - if sched_id: - info['schedule_id'] = sched_id - info['status'] = 'scheduled' - else: - # Fallback: run now + if schedule_type == 'now' or not HAS_SCHEDULER: t = threading.Thread(target=run_job_thread, args=(jid,), daemon=True) t.start() + else: + sched_id = register_scheduler_job(info) + if sched_id: + info['schedule_id'] = sched_id + info['status'] = 'scheduled' + else: + # Fallback: run now + t = threading.Thread(target=run_job_thread, args=(jid,), daemon=True) + t.start() - save_jobs_db() + save_jobs_db() return jid @@ -961,10 +1078,12 @@ def batch_jobs(): @app.route('/jobs') @login_required def list_jobs(): - job_list = [ - job_to_display(jid, info) - for jid, info in sorted(jobs.items(), key=lambda x: x[1].get('started', 0), reverse=True) - ] + with jobs_db_lock: + sorted_items = sorted(jobs.items(), key=lambda x: x[1].get('started', 0), reverse=True) + job_list = [ + job_to_display(jid, info) + for jid, info in sorted_items + ] scheduled_count = sum(1 for j in job_list if j['schedule_id']) return render_template('jobs.html', jobs=job_list, scheduled_count=scheduled_count) @@ -974,16 +1093,19 @@ def list_jobs(): @app.route('/job/') @login_required def job_detail(jobid): - info = jobs.get(jobid) - if not info: - abort(404) - return render_template('job_detail.html', job=job_to_display(jobid, info)) + with jobs_db_lock: + info = jobs.get(jobid) + if not info: + abort(404) + job_disp = job_to_display(jobid, info) + return render_template('job_detail.html', job=job_disp) @app.route('/job//log') @login_required def job_log(jobid): - info = jobs.get(jobid) + with jobs_db_lock: + info = jobs.get(jobid) if not info: abort(404) log_path = JOBS_DIR / jobid / 'backup.log' @@ -997,31 +1119,35 @@ def job_log(jobid): @app.route('/api/job//status') @login_required def api_job_status(jobid): - info = jobs.get(jobid) - if not info: - return jsonify({'error': 'not found'}), 404 + with jobs_db_lock: + info = jobs.get(jobid) + if not info: + return jsonify({'error': 'not found'}), 404 + status = info.get('status', 'unknown') + progress = info.get('progress', {'pct': 0, 'phase': '', 'detail': ''}) return jsonify({ - 'status': info.get('status', 'unknown'), + 'status': status, 'id': jobid, - 'progress': info.get('progress', {'pct': 0, 'phase': '', 'detail': ''}), + 'progress': progress, }) @app.route('/job//cancel-schedule', methods=['POST']) @login_required def cancel_schedule(jobid): - info = jobs.get(jobid) - if not info: - abort(404) - sched_id = info.get('schedule_id') - if sched_id and scheduler: - try: - scheduler.remove_job(sched_id) - except Exception: - pass - info['schedule_id'] = None - info['status'] = info.get('status', 'finished') if info.get('status') not in ('queued', 'running') else info['status'] - save_jobs_db() + with jobs_db_lock: + info = jobs.get(jobid) + if not info: + abort(404) + sched_id = info.get('schedule_id') + if sched_id and scheduler: + try: + scheduler.remove_job(sched_id) + except Exception: + pass + info['schedule_id'] = None + info['status'] = info.get('status', 'finished') if info.get('status') not in ('queued', 'running') else info['status'] + save_jobs_db() flash('Recurring schedule cancelled.', 'success') return redirect(url_for('job_detail', jobid=jobid)) @@ -1029,36 +1155,42 @@ def cancel_schedule(jobid): @app.route('/job//reactivate-schedule', methods=['POST']) @login_required def reactivate_schedule(jobid): - info = jobs.get(jobid) - if not info: - abort(404) - if not info.get('schedule_type') or info.get('schedule_type') == 'now': - flash('This job does not have a recurring schedule configured.', 'danger') - return redirect(url_for('job_detail', jobid=jobid)) - if info.get('schedule_id'): - flash('Schedule is already active.', 'warning') - return redirect(url_for('job_detail', jobid=jobid)) - sched_id = register_scheduler_job(info) - if sched_id: - info['schedule_id'] = sched_id - if info.get('status') not in ('running', 'queued'): - info['status'] = 'scheduled' - save_jobs_db() - flash('Recurring schedule reactivated successfully.', 'success') - else: - flash('Failed to reactivate schedule.', 'danger') + with jobs_db_lock: + info = jobs.get(jobid) + if not info: + abort(404) + if not info.get('schedule_type') or info.get('schedule_type') == 'now': + flash('This job does not have a recurring schedule configured.', 'danger') + return redirect(url_for('job_detail', jobid=jobid)) + if info.get('schedule_id'): + flash('Schedule is already active.', 'warning') + return redirect(url_for('job_detail', jobid=jobid)) + sched_id = register_scheduler_job(info) + if sched_id: + info['schedule_id'] = sched_id + if info.get('status') not in ('running', 'queued'): + info['status'] = 'scheduled' + save_jobs_db() + flash('Recurring schedule reactivated successfully.', 'success') + else: + flash('Failed to reactivate schedule.', 'danger') return redirect(url_for('job_detail', jobid=jobid)) @app.route('/job//run', methods=['POST']) @login_required def run_job_now(jobid): - info = jobs.get(jobid) - if not info: - abort(404) - if info.get('status') in ('running', 'queued'): - flash('Backup is already running or queued.', 'warning') - return redirect(url_for('job_detail', jobid=jobid)) + with jobs_db_lock: + info = jobs.get(jobid) + if not info: + abort(404) + if info.get('status') in ('running', 'queued'): + flash('Backup is already running or queued.', 'warning') + return redirect(url_for('job_detail', jobid=jobid)) + + # Mark status as queued atomically to prevent double run race condition + info['status'] = 'queued' + save_jobs_db() # Start backup execution in a background thread t = threading.Thread(target=run_job_thread, args=(jobid,), daemon=True) @@ -1070,38 +1202,39 @@ def run_job_now(jobid): @app.route('/job//stop', methods=['POST']) @login_required def stop_job(jobid): - info = jobs.get(jobid) - if not info: - abort(404) - if info.get('status') in ('running', 'queued'): - info['status'] = 'cancelling' - info['progress'] = {'pct': info.get('progress', {}).get('pct', 0), 'phase': 'cancelling', 'detail': 'Stopping backup execution…'} - save_jobs_db() - flash('Request to stop backup sent.', 'info') - else: - flash('Job is not running or queued.', 'warning') + with jobs_db_lock: + info = jobs.get(jobid) + if not info: + abort(404) + if info.get('status') in ('running', 'queued'): + info['status'] = 'cancelling' + info['progress'] = {'pct': info.get('progress', {}).get('pct', 0), 'phase': 'cancelling', 'detail': 'Stopping backup execution…'} + save_jobs_db() + flash('Request to stop backup sent.', 'info') + else: + flash('Job is not running or queued.', 'warning') return redirect(url_for('job_detail', jobid=jobid)) @app.route('/job//delete', methods=['POST']) @login_required def delete_job(jobid): - info = jobs.get(jobid) - if not info: - abort(404) - - # Cancel schedule first if it exists - sched_id = info.get('schedule_id') - if sched_id and scheduler: - try: - scheduler.remove_job(sched_id) - except Exception: - pass - - # Remove from jobs dict with jobs_db_lock: + info = jobs.get(jobid) + if not info: + abort(404) + + # Cancel schedule first if it exists + sched_id = info.get('schedule_id') + if sched_id and scheduler: + try: + scheduler.remove_job(sched_id) + except Exception: + pass + + # Remove from jobs dict jobs.pop(jobid, None) - save_jobs_db() + save_jobs_db() # Remove the job directory containing the log file import shutil diff --git a/vsphere_backup.py b/vsphere_backup.py index bfe3207..e825518 100644 --- a/vsphere_backup.py +++ b/vsphere_backup.py @@ -70,14 +70,19 @@ def find_vm_by_name(content, vm_name): def wait_for_task(task, action_name='job'): - while task.info.state not in (vim.TaskInfo.State.success, vim.TaskInfo.State.error): + while True: + info = getattr(task, 'info', None) + if info and info.state in (vim.TaskInfo.State.success, vim.TaskInfo.State.error): + break time.sleep(1) - if task.info.state == vim.TaskInfo.State.success: - return task.info.result + info = task.info + if info.state == vim.TaskInfo.State.success: + return info.result else: - err = task.info.error + err = info.error + fault_name = err.__class__.__name__ if err else "UnknownFault" err_msg = getattr(err, 'msg', None) or str(err) - raise Exception(f"{action_name} did not complete successfully: {err_msg}") + raise Exception(f"{action_name} did not complete successfully: {fault_name}: {err_msg}") def create_snapshot(vm, snap_name, desc="backup snapshot", memory=False, quiesce=False):