vSphere-Backup-Manager/backup_core.py

957 lines
41 KiB
Python

import atexit
import getpass
import hashlib
import json
import os
import re
import ssl
import sys
import time
import urllib.parse
from datetime import datetime
from contextlib import redirect_stdout, redirect_stderr
from pathlib import Path
import requests
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim, vmodl
try:
import paramiko
except Exception:
paramiko = None
def get_si(host, user, pwd, no_verify_ssl=False):
context = None
if no_verify_ssl:
context = ssl._create_unverified_context()
si = SmartConnect(host=host, user=user, pwd=pwd, sslContext=context)
# Caller is responsible for disconnect via Disconnect(si)
return si
def list_vms(host, user, password, no_verify_ssl=False):
"""Connect to vCenter/ESXi and return a list of VM info dicts."""
si = None
try:
si = get_si(host, user, password, no_verify_ssl=no_verify_ssl)
content = si.RetrieveContent()
obj_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
vms = []
for vm in obj_view.view:
try:
summary = vm.summary
config = summary.config
runtime = summary.runtime
guest = summary.guest
storage = summary.storage
# Power state
power_map = {
vim.VirtualMachinePowerState.poweredOn: 'poweredOn',
vim.VirtualMachinePowerState.poweredOff: 'poweredOff',
vim.VirtualMachinePowerState.suspended: 'suspended',
}
power_state = power_map.get(runtime.powerState, str(runtime.powerState))
# Datastore names
ds_names = []
try:
for ds in vm.datastore:
ds_names.append(ds.info.name)
except Exception:
pass
# Disk info (label, path, size)
disks = []
try:
for dev in vm.config.hardware.device:
if isinstance(dev, vim.vm.device.VirtualDisk):
fn = getattr(dev.backing, 'fileName', None)
if not fn:
continue
label = ''
if dev.deviceInfo:
label = dev.deviceInfo.label or ''
size_kb = getattr(dev, 'capacityInKB', 0) or 0
disks.append({
'label': label or f'Hard disk {dev.unitNumber}',
'path': fn,
'size_gb': round(size_kb / (1024 * 1024), 1),
'unit': dev.unitNumber,
})
except Exception:
pass
vms.append({
'name': config.name,
'power_state': power_state,
'num_cpu': config.numCpu,
'memory_mb': config.memorySizeMB,
'guest_os': config.guestFullName or config.guestId or 'Unknown',
'ip_address': (guest.ipAddress or '') if guest else '',
'datastores': ds_names,
'committed_gb': round((storage.committed or 0) / (1024 ** 3), 2),
'tools_status': (guest.toolsStatus or 'unknown') if guest else 'unknown',
'disks': disks,
})
except Exception as e:
vms.append({'name': getattr(vm, 'name', '?'), 'error': str(e),
'power_state': 'unknown', 'num_cpu': 0,
'memory_mb': 0, 'guest_os': '', 'ip_address': '',
'datastores': [], 'committed_gb': 0,
'tools_status': 'unknown', 'disks': []})
obj_view.Destroy()
return vms
finally:
if si:
try:
Disconnect(si)
except Exception:
pass
def wait_for_task(task, action_name='job'):
while task.info.state not in (vim.TaskInfo.State.success, vim.TaskInfo.State.error):
time.sleep(1)
if task.info.state == vim.TaskInfo.State.success:
return task.info.result
else:
err = task.info.error
err_msg = getattr(err, 'msg', None) or str(err)
raise Exception(f"{action_name} did not complete successfully: {err_msg}")
def create_snapshot(vm, snap_name, desc="backup snapshot", memory=False, quiesce=False):
print(f"Creating snapshot '{snap_name}'")
task = vm.CreateSnapshot_Task(name=snap_name, description=desc, memory=memory, quiesce=quiesce)
wait_for_task(task, 'CreateSnapshot')
print("Snapshot created")
def find_datacenter_for_datastore(content, datastore_name):
for dc in content.rootFolder.childEntity:
if isinstance(dc, vim.Datacenter):
for ds in dc.datastore:
if ds.info.name == datastore_name:
return dc
return None
def download_datastore_file(host, dc_name, datastore_name, ds_path, local_path,
session_cookie, verify_ssl=True, progress_cb=None):
"""Download a file from a vSphere datastore and return its SHA-256 checksum. progress_cb(bytes_done, bytes_total) is optional."""
# Keep slashes unencoded (safe='/') — vCenter's /folder/ API requires them in the URL path.
encoded_path = urllib.parse.quote(ds_path, safe='/')
url = (f"https://{host}/folder/{encoded_path}"
f"?dcPath={urllib.parse.quote(dc_name)}&dsName={urllib.parse.quote(datastore_name)}")
headers = {"Cookie": f"vmware_soap_session={session_cookie}"}
print(f"Downloading {ds_path} from datastore {datastore_name} to {local_path}")
print(f" URL: {url}")
sha256 = hashlib.sha256()
with requests.get(url, headers=headers, stream=True, verify=verify_ssl, proxies={"http": None, "https": None}) as r:
r.raise_for_status()
total_bytes = int(r.headers.get('Content-Length', 0))
print(f" HTTP {r.status_code}, Content-Length: {total_bytes} bytes")
done_bytes = 0
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=4 * 1024 * 1024):
if chunk:
f.write(chunk)
sha256.update(chunk)
done_bytes += len(chunk)
if progress_cb:
progress_cb(done_bytes, total_bytes)
print(f"Download completed ({done_bytes // (1024*1024)} MB)")
return sha256.hexdigest()
def extract_session_cookie(si):
raw = getattr(si._stub, 'cookie', '')
m = re.search(r"vmware_soap_session\s*=\s*\"?([A-Za-z0-9\-_]+)\"?", raw)
if m:
return m.group(1)
return None
def vm_disk_vmdk_paths(vm):
files = set()
for dev in vm.config.hardware.device:
if isinstance(dev, vim.vm.device.VirtualDisk):
backing = dev.backing
fn = getattr(backing, 'fileName', None)
if fn:
files.add(fn)
return list(files)
def vm_config_vmx_path(vm):
return getattr(vm.config.files, 'vmPathName', None)
def parse_datastore_path(ds_file_ref):
m = re.match(r"\[(?P<ds>[^\]]+)\]\s*(?P<path>.+)", ds_file_ref)
if not m:
raise ValueError(f"Unexpected datastore file format: {ds_file_ref}")
return m.group('ds'), m.group('path')
def find_snapshot_by_name(snapshots, name):
for snap in snapshots:
if snap.name == name:
return snap.snapshot
if snap.childSnapshotList:
found = find_snapshot_by_name(snap.childSnapshotList, name)
if found:
return found
return None
def remove_snapshot(snapshot_obj):
print("Removing snapshot")
max_retries = 3
for attempt in range(max_retries):
try:
task = snapshot_obj.RemoveSnapshot_Task(removeChildren=False)
wait_for_task(task, 'RemoveSnapshot')
print("Snapshot removed")
return
except Exception as e:
print(f"Attempt {attempt+1} to remove snapshot failed: {e}")
if attempt < max_retries - 1:
print("Waiting 5 seconds before retrying...")
time.sleep(5)
else:
raise e
# ── CBT (Changed Block Tracking) helpers ─────────────────────────────────────
def enable_cbt(vm, content):
"""Enable changeTrackingEnabled on a VM if not already set.
CBT requires a snapshot cycle to activate. We create+delete a transient
snapshot here so the flag takes effect before the real backup snapshot.
Returns True if CBT was already enabled, False if we just enabled it.
"""
cfg = vm.config
if cfg.changeTrackingEnabled:
print("CBT: changeTrackingEnabled is already ON")
return True
print("CBT: Enabling changeTrackingEnabled on VM…")
spec = vim.vm.ConfigSpec()
spec.changeTrackingEnabled = True
task = vm.ReconfigVM_Task(spec=spec)
wait_for_task(task, 'EnableCBT')
print("CBT: changeTrackingEnabled set to True")
# Force a snapshot cycle so CBT activates on all disks
print("CBT: Creating transient activation snapshot…")
act_snap_name = f"cbt-activate-{int(time.time())}"
task = vm.CreateSnapshot_Task(
name=act_snap_name,
description="CBT activation (auto-deleted)",
memory=False, quiesce=False
)
wait_for_task(task, 'CBTActivateSnapshot')
# Immediately delete it
snap_root = getattr(vm, 'snapshot', None)
if snap_root and snap_root.rootSnapshotList:
snap_obj = find_snapshot_by_name(snap_root.rootSnapshotList, act_snap_name)
if snap_obj:
task = snap_obj.RemoveSnapshot_Task(removeChildren=False)
wait_for_task(task, 'CBTActivateSnapshotRemove')
print("CBT: Transient snapshot removed — CBT is now active")
return False
def get_disk_device_by_key(vm, device_key):
"""Return the VirtualDisk device object for a given device key."""
for dev in vm.config.hardware.device:
if isinstance(dev, vim.vm.device.VirtualDisk) and dev.key == device_key:
return dev
return None
def get_disk_change_id(snapshot_ref, device_key):
"""Return the changeId for a disk at a given snapshot.
changeId '*' means "give me all changes since disk was created" — used
to seed the first incremental after a full backup.
"""
try:
for disk_layout in snapshot_ref.config.hardware.device:
if isinstance(disk_layout, vim.vm.device.VirtualDisk) and disk_layout.key == device_key:
backing = disk_layout.backing
cid = getattr(backing, 'changeId', None)
if cid:
return cid
except Exception as e:
print(f"CBT: Could not get changeId for device key {device_key}: {e}")
return None
def query_changed_areas(vm_snapshot, device_key, change_id, start_offset=0):
"""Call QueryChangedDiskAreas and return a list of {start, length} extents.
change_id: the changeId from the *previous* backup snapshot.
Use '*' to get all changed areas since disk creation.
Returns list of dicts: [{'start': int, 'length': int}, ...]
"""
extents = []
try:
result = vm_snapshot.QueryChangedDiskAreas(
id=device_key,
startOffset=start_offset,
changeId=change_id
)
if result and result.changedArea:
for area in result.changedArea:
extents.append({'start': area.start, 'length': area.length})
print(f"CBT: QueryChangedDiskAreas returned {len(extents)} extent(s), "
f"{sum(e['length'] for e in extents) // (1024*1024)} MB changed")
except vmodl.fault.InvalidArgument as e:
print(f"CBT: InvalidArgument querying changed areas (changeId may be stale): {e}")
raise
except Exception as e:
print(f"CBT: Error querying changed areas: {e}")
raise
return extents
def download_disk_changed_ranges(host, dc_name, ds_name, ds_path, extents,
local_path, session_cookie,
total_disk_size, verify_ssl=True,
progress_cb=None):
"""Download only the changed byte extents from a flat VMDK via HTTP Range requests.
Writes a sparse file: changed extents are filled with downloaded data;
unchanged regions remain as zero bytes (seek over them).
Returns (sha256_hex, bytes_downloaded).
"""
encoded_path = urllib.parse.quote(ds_path, safe='/')
url = (f"https://{host}/folder/{encoded_path}"
f"?dcPath={urllib.parse.quote(dc_name)}&dsName={urllib.parse.quote(ds_name)}")
headers_base = {"Cookie": f"vmware_soap_session={session_cookie}"}
total_changed = sum(e['length'] for e in extents)
print(f"CBT: Downloading {len(extents)} changed extent(s), "
f"{total_changed // (1024*1024)} MB / "
f"{total_disk_size // (1024*1024)} MB total")
sha256 = hashlib.sha256()
bytes_downloaded = 0
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# We build a sparse file matching the full disk geometry so restore works
with open(local_path, 'wb') as f:
# Pre-allocate to full disk size (sparse/hole-punched on Linux)
if total_disk_size > 0:
f.seek(total_disk_size - 1)
f.write(b'\x00')
f.seek(0)
# Track file position for SHA-256 over the full logical disk
# We hash the file after writing instead of on-the-fly to handle seeks correctly
for i, extent in enumerate(extents):
start = extent['start']
length = extent['length']
end_byte = start + length - 1
range_header = f"bytes={start}-{end_byte}"
req_headers = {**headers_base, "Range": range_header}
with requests.get(url, headers=req_headers, stream=True,
verify=verify_ssl,
proxies={"http": None, "https": None}) as r:
if r.status_code not in (200, 206):
raise Exception(f"HTTP {r.status_code} for Range {range_header}")
f.seek(start)
for chunk in r.iter_content(chunk_size=4 * 1024 * 1024):
if chunk:
f.write(chunk)
bytes_downloaded += len(chunk)
if progress_cb and total_changed > 0:
progress_cb(bytes_downloaded, total_changed)
# Compute SHA-256 of the resulting file
sha256 = hashlib.sha256()
with open(local_path, 'rb') as f:
while True:
chunk = f.read(4 * 1024 * 1024)
if not chunk:
break
sha256.update(chunk)
print(f"CBT: Incremental download complete — {bytes_downloaded // (1024*1024)} MB written")
return sha256.hexdigest(), bytes_downloaded
CBT_STATE_FILENAME = 'cbt_state.json'
def load_cbt_state(vm_base_dir):
"""Load the CBT state dict from <vm_base_dir>/cbt_state.json.
Returns dict with structure:
{ 'last_backup_ts': str, 'disks': { disk_path: { 'change_id': str, ... } } }
or empty dict if not found.
"""
state_path = os.path.join(vm_base_dir, CBT_STATE_FILENAME)
if not os.path.exists(state_path):
return {}
try:
with open(state_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"CBT: Could not read state file {state_path}: {e}")
return {}
def save_cbt_state(vm_base_dir, state):
"""Persist the CBT state dict to <vm_base_dir>/cbt_state.json."""
os.makedirs(vm_base_dir, exist_ok=True)
state_path = os.path.join(vm_base_dir, CBT_STATE_FILENAME)
try:
with open(state_path, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
print(f"CBT: State saved to {state_path}")
except Exception as e:
print(f"CBT: Could not save state file: {e}")
def get_file_sha256(filepath, decompress_if_zst=False):
"""Compute the SHA-256 hash of a file. Optionally decompress on-the-fly if it is a .zst file."""
sha256 = hashlib.sha256()
if decompress_if_zst and str(filepath).lower().endswith('.zst'):
try:
import zstandard as zstd
with open(filepath, 'rb') as f:
dctx = zstd.ZstdDecompressor()
decompressor = dctx.read_to_iter(f, read_size=4*1024*1024)
for chunk in decompressor:
sha256.update(chunk)
return sha256.hexdigest()
except Exception as e:
print(f"Warning: Failed to decompress on the fly for SHA calculation: {e}. Falling back to raw file hash.")
sha256 = hashlib.sha256()
with open(filepath, 'rb') as f:
while True:
chunk = f.read(4 * 1024 * 1024)
if not chunk:
break
sha256.update(chunk)
return sha256.hexdigest()
def verify_backup_checksums(dest_dir):
"""Verify all files inside a backup directory using its manifest.json."""
manifest_path = os.path.join(dest_dir, 'manifest.json')
if not os.path.exists(manifest_path):
print(f"No manifest.json found in {dest_dir}, skipping checksum verification.")
return True
try:
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
except Exception as e:
print(f"Error reading manifest.json: {e}")
return False
print(f"Verifying checksums for backup of VM: {manifest.get('vm_name', 'unknown')}")
all_ok = True
for file_info in manifest.get('files', []):
rel_path = file_info.get('path')
expected_sha = file_info.get('sha256')
# Determine actual file on disk (could be compressed with .zst extension)
filepath = os.path.join(dest_dir, rel_path)
actual_path = filepath
decompress = False
if not os.path.exists(filepath):
if os.path.exists(filepath + '.zst'):
actual_path = filepath + '.zst'
decompress = True
else:
print(f"Verification FAILED: File not found: {filepath}")
all_ok = False
continue
actual_sha = get_file_sha256(actual_path, decompress_if_zst=decompress)
if actual_sha == expected_sha:
print(f"Verification OK: {rel_path} (decompress={decompress})")
else:
print(f"Verification FAILED: {rel_path} (Expected: {expected_sha}, Got: {actual_sha})")
all_ok = False
return all_ok
def maybe_compress(path):
try:
import subprocess
rc = subprocess.run(['zstd', '-19', path], check=False)
if rc.returncode == 0:
return path + '.zst'
except FileNotFoundError:
pass
try:
import zstandard as zstd
out_path = path + '.zst'
with open(path, 'rb') as ifh, open(out_path, 'wb') as ofh:
cctx = zstd.ZstdCompressor(level=19)
cctx.copy_stream(ifh, ofh)
return out_path
except Exception:
print('Compression not available; skipping')
return path
def upload_via_sftp(host, user, password, key_filename, local_path, remote_dir):
if paramiko is None:
raise RuntimeError("paramiko is required for SFTP upload")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if key_filename:
client.connect(hostname=host, username=user, key_filename=key_filename)
else:
client.connect(hostname=host, username=user, password=password)
sftp = client.open_sftp()
try:
try:
sftp.chdir(remote_dir)
except IOError:
sftp.mkdir(remote_dir)
sftp.chdir(remote_dir)
fname = os.path.basename(local_path)
print(f"Uploading {local_path} to {host}:{remote_dir}/{fname}")
sftp.put(local_path, fname)
finally:
sftp.close()
client.close()
def run_backup(host, user, password, vm_name, dest, compress=False, no_verify_ssl=False,
sftp_host=None, sftp_user=None, sftp_password=None, sftp_key=None,
log_path=None, progress_cb=None, disk_filter=None, job_id=None,
is_cancelled_cb=None, use_cbt=False):
"""Run backup flow (full or CBT incremental).
disk_filter: if not None, a set/list of VMDK file-ref strings to include.
The VMX config file is always included regardless.
use_cbt: if True, attempt Changed Block Tracking incremental backup.
Falls back to full download if CBT state is unavailable.
"""
if log_path:
logfile = open(log_path, 'a', encoding='utf-8', buffering=1)
def _wrap():
with redirect_stdout(logfile), redirect_stderr(logfile):
return _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl,
sftp_host, sftp_user, sftp_password, sftp_key,
progress_cb=progress_cb, disk_filter=disk_filter, job_id=job_id,
is_cancelled_cb=is_cancelled_cb, use_cbt=use_cbt)
try:
return _wrap()
finally:
logfile.close()
else:
return _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl,
sftp_host, sftp_user, sftp_password, sftp_key,
progress_cb=progress_cb, disk_filter=disk_filter, job_id=job_id,
is_cancelled_cb=is_cancelled_cb, use_cbt=use_cbt)
def _run_backup_impl(host, user, password, vm_name, dest, compress, no_verify_ssl,
sftp_host, sftp_user, sftp_password, sftp_key,
progress_cb=None, disk_filter=None, job_id=None,
is_cancelled_cb=None, use_cbt=False):
def _prog(phase, pct, detail=''):
if progress_cb:
try:
progress_cb({'phase': phase, 'pct': pct, 'detail': detail})
except Exception:
pass
si = None
started_iso = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
try:
_prog('connecting', 0, 'Connecting to vCenter…')
si = get_si(host, user, password, no_verify_ssl=no_verify_ssl)
content = si.RetrieveContent()
_prog('connecting', 2, f'Looking up VM: {vm_name}')
obj_view = content.viewManager.CreateContainerView(content.rootFolder, [vim.VirtualMachine], True)
vm = None
for v in obj_view.view:
if v.name == vm_name:
vm = v
break
obj_view.Destroy()
if not vm:
raise Exception(f"VM named {vm_name} not found")
snap_name = f"backup-{int(time.time())}"
created_snapshot = False
# ── CBT pre-snapshot setup ────────────────────────────────────────────
# vm_base_dir is where cbt_state.json lives (shared across all run dirs)
vm_base_dir = os.path.join(dest, vm_name) if not dest.endswith(vm_name) else dest
# Normalize: dest passed in is already the run-specific dir (backup-YYYYMMDDHHMMSS)
# so we go one level up to find the VM base dir for CBT state
vm_base_dir = str(Path(dest).parent)
cbt_state = {}
if use_cbt:
_prog('snapshot', 1, 'Enabling Changed Block Tracking (CBT)…')
try:
enable_cbt(vm, content)
cbt_state = load_cbt_state(vm_base_dir)
if cbt_state:
print(f"CBT: Found prior state from {cbt_state.get('last_backup_ts', 'unknown')}")
else:
print("CBT: No prior state — this will be a FULL backup (seeding CBT)")
except Exception as e:
print(f"CBT: Failed to enable CBT, falling back to full backup: {e}")
use_cbt = False
try:
_prog('snapshot', 3, 'Creating snapshot…')
create_snapshot(vm, snap_name, desc="Automated backup snapshot", memory=False, quiesce=False)
created_snapshot = True
_prog('snapshot', 5, 'Snapshot created')
session_cookie = extract_session_cookie(si)
if not session_cookie:
raise Exception('Could not extract session cookie for downloads')
# Get VMDK paths and normalize them (strip snapshot suffixes like -000001)
# so we always request the base VMDKs which vCenter streams as the full data disk
raw_vmdk_refs = vm_disk_vmdk_paths(vm)
vmdk_refs = [re.sub(r'-\d+\.vmdk$', '.vmdk', r, flags=re.IGNORECASE) for r in raw_vmdk_refs]
vmx_ref = vm_config_vmx_path(vm)
# Build a map of normalized vmdk_ref -> VirtualDisk device for CBT
disk_devices = {}
for dev in vm.config.hardware.device:
if isinstance(dev, vim.vm.device.VirtualDisk):
fn = getattr(dev.backing, 'fileName', None)
if fn:
norm = re.sub(r'-\d+\.vmdk$', '.vmdk', fn, flags=re.IGNORECASE)
disk_devices[norm] = dev
# Locate the backup snapshot object for CBT queries
snap_ref = None
if use_cbt:
try:
snap_root = getattr(vm, 'snapshot', None)
if snap_root and snap_root.rootSnapshotList:
snap_ref = find_snapshot_by_name(snap_root.rootSnapshotList, snap_name)
if not snap_ref:
print("CBT: Could not locate backup snapshot for QueryChangedDiskAreas — falling back to full")
use_cbt = False
except Exception as e:
print(f"CBT: Snapshot lookup failed: {e} — falling back to full")
use_cbt = False
# Apply disk filter — only download selected VMDKs
if disk_filter is not None:
disk_filter_set = {re.sub(r'-\d+\.vmdk$', '.vmdk', f, flags=re.IGNORECASE) for f in disk_filter}
skipped = []
filtered_vmdk_refs = []
for raw_ref, norm_ref in zip(raw_vmdk_refs, vmdk_refs):
if norm_ref in disk_filter_set:
filtered_vmdk_refs.append(norm_ref)
else:
skipped.append(raw_ref)
vmdk_refs = filtered_vmdk_refs
if skipped:
print(f"Skipping {len(skipped)} disk(s) per disk_filter: {skipped}")
if not vmdk_refs:
print("Warning: no disks selected — backing up VMX config only.")
# ── Build download list ───────────────────────────────────────────
# Descriptor (.vmdk) + flat data (-flat.vmdk) pairs, plus VMX
# For CBT mode, we only do range-downloads on the flat file; the
# small descriptor is always fetched in full.
all_refs = []
flat_vmdk_refs = set() # track which refs are flat data disks
for ref in vmdk_refs:
all_refs.append(ref) # descriptor (small)
if ref.lower().endswith('.vmdk') and not ref.lower().endswith('-flat.vmdk'):
flat_ref = ref[:-5] + '-flat.vmdk'
all_refs.append(flat_ref)
flat_vmdk_refs.add(flat_ref)
if vmx_ref:
all_refs.append(vmx_ref)
total_files = len(all_refs)
# Download phase: 5% -> 90%
DOWNLOAD_START = 5
DOWNLOAD_END = 90
download_range = DOWNLOAD_END - DOWNLOAD_START
downloaded_files = []
files_manifest_info = []
# Track CBT savings across all disks for manifest
cbt_total_changed_bytes = 0
cbt_total_disk_bytes = 0
new_cbt_disk_state = {} # updated state to persist after success
for file_idx, ref in enumerate(all_refs):
if is_cancelled_cb and is_cancelled_cb():
raise RuntimeError("Backup cancelled by user")
ds_name, ds_path = parse_datastore_path(ref)
dc = find_datacenter_for_datastore(content, ds_name)
if not dc:
raise Exception(f"Datacenter for datastore {ds_name} not found")
dc_name = dc.name
safe_path = ds_path.replace('/', os.sep)
local_file = os.path.join(dest, ds_name, safe_path)
file_base_pct = DOWNLOAD_START + int((file_idx / total_files) * download_range)
file_share = download_range / total_files
def make_dl_cb(fidx, total, base_pct, share, fname):
def _dl_cb(done, total_b):
if is_cancelled_cb and is_cancelled_cb():
raise RuntimeError("Backup cancelled by user")
if total_b > 0:
file_pct = done / total_b
overall_pct = int(base_pct + file_pct * share)
done_mb = done // (1024 * 1024)
total_mb = total_b // (1024 * 1024)
detail = (f'File {fidx+1}/{total}: {fname}'
f'{done_mb} / {total_mb} MB '
f'({int(file_pct*100)}%)')
else:
overall_pct = base_pct
detail = f'File {fidx+1}/{total}: {fname}'
_prog('downloading', overall_pct, detail)
return _dl_cb
_prog('downloading', file_base_pct,
f'Starting file {file_idx+1}/{total_files}: {os.path.basename(ds_path)}')
# ── CBT incremental path for flat VMDK data disks ─────────────
is_flat_disk = ref in flat_vmdk_refs
did_cbt = False
file_sha = None
bytes_downloaded_this_file = None
if use_cbt and is_flat_disk and snap_ref:
# Find the device key for the descriptor that corresponds
# to this flat file (descriptor ref = flat_ref without -flat)
descriptor_ref = ref[:-len('-flat.vmdk')] + '.vmdk'
dev = disk_devices.get(descriptor_ref)
if dev:
device_key = dev.key
prior_disk_state = cbt_state.get('disks', {}).get(ref, {})
prior_change_id = prior_disk_state.get('change_id')
disk_size_bytes = (getattr(dev, 'capacityInKB', 0) or 0) * 1024
if prior_change_id:
# ── Incremental: query and download only changes ──
print(f"CBT: Incremental mode for {ref} "
f"(prior changeId: {prior_change_id[:20]}…)")
try:
extents = query_changed_areas(
snap_ref, device_key, prior_change_id
)
if not extents:
print(f"CBT: No changes detected for {ref} — creating empty delta")
os.makedirs(os.path.dirname(local_file), exist_ok=True)
open(local_file, 'wb').close()
file_sha = hashlib.sha256(b'').hexdigest()
bytes_downloaded_this_file = 0
did_cbt = True
else:
total_extent_bytes = sum(e['length'] for e in extents)
cbt_total_changed_bytes += total_extent_bytes
cbt_total_disk_bytes += disk_size_bytes
file_sha, bytes_downloaded_this_file = download_disk_changed_ranges(
host, dc_name, ds_name, ds_path, extents,
local_file, session_cookie,
total_disk_size=disk_size_bytes,
verify_ssl=not no_verify_ssl,
progress_cb=make_dl_cb(
file_idx, total_files,
file_base_pct, file_share,
f"[CBT] {os.path.basename(ds_path)}"
)
)
did_cbt = True
except Exception as cbt_err:
print(f"CBT: Incremental download failed ({cbt_err}), "
f"falling back to full download for {ref}")
did_cbt = False
else:
# No prior state: this is the seeding full backup
print(f"CBT: No prior changeId for {ref}"
f"performing FULL download to seed CBT")
cbt_total_disk_bytes += disk_size_bytes
# After snapshot, get the new changeId for next run
new_cid = get_disk_change_id(snap_ref, device_key)
new_cbt_disk_state[ref] = {
'change_id': new_cid or '*',
'backup_type': 'incremental' if (did_cbt and prior_change_id) else 'full',
'last_snapshot': snap_name,
}
if not did_cbt:
# ── Full download path (also used for descriptors & VMX) ──
file_sha = download_datastore_file(
host, dc_name, ds_name, ds_path, local_file, session_cookie,
verify_ssl=not no_verify_ssl,
progress_cb=make_dl_cb(file_idx, total_files, file_base_pct,
file_share, os.path.basename(ds_path))
)
downloaded_files.append(local_file)
file_size = os.path.getsize(local_file)
print(f"SHA-256: {file_sha} (size: {file_size} bytes)")
rel_path = os.path.relpath(local_file, dest).replace(os.sep, '/')
manifest_entry = {
"path": rel_path,
"size_bytes": file_size,
"sha256": file_sha,
}
if use_cbt and is_flat_disk and ref in new_cbt_disk_state:
disk_st = new_cbt_disk_state[ref]
manifest_entry['backup_type'] = disk_st.get('backup_type', 'full')
if bytes_downloaded_this_file is not None:
manifest_entry['changed_bytes'] = bytes_downloaded_this_file
files_manifest_info.append(manifest_entry)
_prog('compressing', 90, 'Downloads complete. Creating manifest…')
if is_cancelled_cb and is_cancelled_cb():
raise RuntimeError("Backup cancelled by user")
# Write manifest.json
finished_iso = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
# Determine overall backup type label
has_incremental = any(
e.get('backup_type') == 'incremental'
for e in files_manifest_info
)
overall_type = 'incremental' if has_incremental else 'full'
manifest_data = {
"job_id": job_id or "...",
"vm_name": vm_name,
"started": started_iso,
"finished": finished_iso,
"vcenter": host,
"snapshot": snap_name,
"backup_type": overall_type,
"cbt_enabled": use_cbt,
"files": files_manifest_info
}
if use_cbt and cbt_total_disk_bytes > 0:
savings_pct = round(
(1 - cbt_total_changed_bytes / cbt_total_disk_bytes) * 100, 1
)
manifest_data['cbt_transfer_savings_pct'] = savings_pct
manifest_data['cbt_changed_bytes'] = cbt_total_changed_bytes
manifest_data['cbt_total_disk_bytes'] = cbt_total_disk_bytes
print(f"CBT summary: {savings_pct}% transfer savings "
f"({cbt_total_changed_bytes // (1024*1024)} MB transferred of "
f"{cbt_total_disk_bytes // (1024*1024)} MB total)")
manifest_path = os.path.join(dest, 'manifest.json')
with open(manifest_path, 'w', encoding='utf-8') as f:
json.dump(manifest_data, f, indent=2)
print(f"Backup manifest created at {manifest_path}")
# Persist CBT state for next incremental run
if use_cbt and new_cbt_disk_state:
updated_state = {
'last_backup_ts': finished_iso,
'backup_type': overall_type,
'disks': new_cbt_disk_state,
}
save_cbt_state(vm_base_dir, updated_state)
final_files = []
for f in downloaded_files:
if is_cancelled_cb and is_cancelled_cb():
raise RuntimeError("Backup cancelled by user")
if compress:
_prog('compressing', 92, f'Compressing {os.path.basename(f)}')
cf = maybe_compress(f)
final_files.append(cf)
else:
final_files.append(f)
# manifest.json is added uncompressed
final_files.append(manifest_path)
if sftp_host:
if not sftp_user:
raise Exception('SFTP user required')
if is_cancelled_cb and is_cancelled_cb():
raise RuntimeError("Backup cancelled by user")
# Verify checksums before upload
_prog('uploading', 94, 'Verifying local checksums before SFTP upload…')
print("Running pre-upload checksum verification...")
if not verify_backup_checksums(dest):
raise Exception("Pre-upload checksum verification failed. Aborting SFTP upload to prevent remote corruption.")
print("Checksum verification succeeded.")
_prog('uploading', 95, f'Uploading to {sftp_host}')
for f in final_files:
upload_via_sftp(sftp_host, sftp_user, sftp_password, sftp_key, f, os.path.basename(dest))
_prog('cleanup', 97, 'Removing snapshot…')
print('Backup completed successfully')
finally:
if created_snapshot:
try:
# Re-fetch vm snapshot state to avoid stale object references
content.rootFolder # touch to keep session alive
vm_fresh = None
obj_view2 = content.viewManager.CreateContainerView(content.rootFolder, [vim.VirtualMachine], True)
for v in obj_view2.view:
if v.name == vm_name:
vm_fresh = v
break
obj_view2.Destroy()
target_vm = vm_fresh or vm
snap_root = getattr(target_vm, 'snapshot', None)
if snap_root and snap_root.rootSnapshotList:
snap_obj = find_snapshot_by_name(snap_root.rootSnapshotList, snap_name)
if snap_obj:
remove_snapshot(snap_obj)
else:
print('Snapshot already removed or not found in tree')
else:
print('No snapshots found on VM — may have already been removed')
except Exception as e:
print(f'Failed to remove snapshot: {e}', file=sys.stderr)
_prog('done', 100, 'Backup finished successfully')
finally:
if si:
try:
Disconnect(si)
except Exception:
pass