A standard Python library that abstracts away differences among multiple cloud provider APIs
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The backup service provides a unified interface for backup and snapshot management across multiple cloud backup providers including AWS EBS Snapshots, Google Persistent Disk Snapshots, Azure Disk Snapshots, and other backup-as-a-service providers.
from libcloud.backup.types import Provider
class Provider:
"""Enumeration of supported backup providers"""
EBS = 'ebs' # AWS EBS Snapshots
GCE = 'gce' # Google Persistent Disk Snapshots
AZURE_ARM = 'azure_arm' # Azure Resource Manager Snapshots
DIMENSIONDATA = 'dimensiondata' # Dimension Data Backup
CLOUDSTACK = 'cloudstack' # CloudStack Snapshots
# ... more providersfrom libcloud.backup.providers import get_driver
def get_driver(provider: Provider) -> type[BackupDriver]Get the driver class for a specific backup provider.
Parameters:
provider: Provider identifier from the Provider enumReturns:
Example:
from libcloud.backup.types import Provider
from libcloud.backup.providers import get_driver
# Get AWS EBS backup driver class
cls = get_driver(Provider.EBS)
# Initialize driver with credentials
driver = cls('access_key', 'secret_key', region='us-east-1')class BackupDriver(BaseDriver):
"""Base class for all backup drivers"""
def list_targets(self) -> List[BackupTarget]
def get_target(self, target_id: str) -> BackupTarget
def create_target_from_node(self, node: Node, name: str = None, ex_use_tags: bool = True) -> BackupTarget
def create_target_from_container(self, container: Container, name: str = None) -> BackupTarget
def update_target(self, target: BackupTarget, name: str = None, extra: Dict = None) -> BackupTarget
def delete_target(self, target: BackupTarget) -> bool
def list_recovery_points(self, target: BackupTarget, start_date: datetime = None, end_date: datetime = None) -> List[BackupTarget]
def recover_target(self, target: BackupTarget, recovery_point: BackupTarget, recovery_target_name: str = None) -> Node
def recover_target_out_of_place(self, target: BackupTarget, recovery_point: BackupTarget, recovery_target_name: str = None, **kwargs) -> Node
def create_target_backup_job(self, target: BackupTarget, extra: Dict = None) -> BackupTargetJob
def list_target_jobs(self, target: BackupTarget) -> List[BackupTargetJob]
def ex_list_available_backup_locations(self) -> List[Dict]Base class that all backup drivers inherit from. Provides methods for managing backup targets, recovery points, and backup jobs.
Key Methods:
list_targets(): List all backup targetscreate_target_from_node(): Create backup target from compute nodelist_recovery_points(): List available recovery points for a targetrecover_target(): Restore a target from a recovery pointcreate_target_backup_job(): Create a backup jobdelete_target(): Delete a backup targetclass BackupTarget:
"""Represents a backup target"""
id: str
name: str
address: str
type: BackupTargetType
size: int
driver: BackupDriver
extra: Dict[str, Any]
def list_recovery_points(self, start_date: datetime = None, end_date: datetime = None) -> List[BackupTarget]
def recover(self, recovery_point: BackupTarget, recovery_target_name: str = None) -> Node
def backup(self, name: str = None) -> BackupTargetJob
def delete(self) -> boolRepresents a backup target (source for backups like a disk, volume, or node).
Properties:
id: Unique backup target identifiername: Human-readable nameaddress: Target address/identifier (volume ID, node ID, etc.)type: Type of backup target (volume, node, etc.)size: Size in bytes of the backup targetextra: Provider-specific metadataMethods:
list_recovery_points(): List recovery points for this targetrecover(): Restore this target from a recovery pointbackup(): Create a backup of this targetdelete(): Delete this backup targetclass BackupTargetJob:
"""Represents a backup job"""
id: str
target_id: str
status: BackupTargetJobStatusType
progress: float
created_at: datetime
driver: BackupDriver
extra: Dict[str, Any]Represents a backup job/operation.
Properties:
id: Unique job identifiertarget_id: ID of the backup targetstatus: Current job status (running, completed, failed, etc.)progress: Job progress as percentage (0.0 to 1.0)created_at: Job creation timestampextra: Provider-specific job metadataclass BackupTargetType:
"""Backup target types enumeration"""
VOLUME = 'volume'
NODE = 'node'
CONTAINER = 'container'
FILE_SYSTEM = 'file_system'
DATABASE = 'database'
VIRTUAL_MACHINE = 'virtual_machine'Enumeration of supported backup target types.
class BackupTargetJobStatusType:
"""Backup job status types enumeration"""
PENDING = 'pending'
RUNNING = 'running'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELLED = 'cancelled'
UNKNOWN = 'unknown'Enumeration of possible backup job statuses.
from libcloud.backup.types import Provider, BackupTargetType
from libcloud.backup.providers import get_driver
from libcloud.compute.types import Provider as ComputeProvider
from libcloud.compute.providers import get_driver as get_compute_driver
# Initialize backup driver (AWS EBS example)
backup_cls = get_driver(Provider.EBS)
backup_driver = backup_cls('access_key', 'secret_key', region='us-east-1')
# Initialize compute driver to get nodes
compute_cls = get_compute_driver(ComputeProvider.EC2)
compute_driver = compute_cls('access_key', 'secret_key', region='us-east-1')
# List existing backup targets
targets = backup_driver.list_targets()
print(f"Existing backup targets: {len(targets)}")
for target in targets:
print(f"Target: {target.name} (Type: {target.type}, Size: {target.size} bytes)")
print(f" Address: {target.address}")
print(f" Created: {target.extra.get('created_at', 'unknown')}")
# Create backup target from a compute node
nodes = compute_driver.list_nodes()
if nodes:
node = nodes[0] # Use first node
backup_target = backup_driver.create_target_from_node(
node=node,
name=f'backup-{node.name}',
ex_use_tags=True
)
print(f"Created backup target: {backup_target.name} ({backup_target.id})")# Get a backup target
target = backup_driver.get_target('backup-target-123')
print(f"Backup target: {target.name}")
# Create a backup job
backup_job = backup_driver.create_target_backup_job(
target=target,
extra={'description': 'Daily backup', 'retention_days': 30}
)
print(f"Created backup job: {backup_job.id} (Status: {backup_job.status})")
# Alternative: Create backup using target method
backup_job2 = target.backup(name='manual-backup-2023-10-15')
print(f"Created backup via target: {backup_job2.id}")
# List all backup jobs for a target
jobs = backup_driver.list_target_jobs(target)
print(f"Backup jobs for {target.name}: {len(jobs)}")
for job in jobs:
print(f" Job {job.id}: {job.status} ({job.progress:.1%} complete)")
print(f" Created: {job.created_at}")
if job.extra:
print(f" Extra: {job.extra}")from datetime import datetime, timedelta
# List recovery points for a target
recovery_points = backup_driver.list_recovery_points(target)
print(f"Available recovery points: {len(recovery_points)}")
for rp in recovery_points:
print(f"Recovery Point: {rp.name} (Created: {rp.extra.get('created_at')})")
print(f" Size: {rp.size} bytes")
# List recovery points within a date range
end_date = datetime.now()
start_date = end_date - timedelta(days=7) # Last 7 days
recent_recovery_points = backup_driver.list_recovery_points(
target,
start_date=start_date,
end_date=end_date
)
print(f"Recovery points from last 7 days: {len(recent_recovery_points)}")
# Alternative: List using target method
target_recovery_points = target.list_recovery_points(start_date=start_date)
print(f"Target recovery points: {len(target_recovery_points)}")# Recover target in place (restore to original location)
if recovery_points:
latest_recovery_point = recovery_points[0] # Assuming sorted by date
print(f"Restoring {target.name} from recovery point {latest_recovery_point.name}")
restored_node = backup_driver.recover_target(
target=target,
recovery_point=latest_recovery_point,
recovery_target_name=f'restored-{target.name}'
)
print(f"Restored node: {restored_node.name} ({restored_node.id})")
# Recover target out of place (restore to new location/instance)
if recovery_points:
recovery_point = recovery_points[0]
restored_node = backup_driver.recover_target_out_of_place(
target=target,
recovery_point=recovery_point,
recovery_target_name='disaster-recovery-instance',
ex_instance_type='t3.medium', # Different instance type
ex_subnet_id='subnet-new-123', # Different subnet
ex_security_groups=['sg-disaster-recovery']
)
print(f"Out-of-place recovery completed: {restored_node.name}")
# Alternative: Recover using target method
if recovery_points:
restored_via_target = target.recover(
recovery_point=recovery_points[0],
recovery_target_name='target-method-recovery'
)
print(f"Recovered via target method: {restored_via_target.name}")import time
from datetime import datetime, timedelta
from typing import List, Dict
def create_backup_schedule(backup_driver, targets: List[BackupTarget], schedule_config: Dict):
"""Create automated backup schedule"""
def should_backup(target: BackupTarget, config: Dict) -> bool:
"""Check if target should be backed up based on schedule"""
# Get last backup time
jobs = backup_driver.list_target_jobs(target)
completed_jobs = [j for j in jobs if j.status == 'completed']
if not completed_jobs:
return True # No backups yet
# Sort by creation time and get latest
latest_job = max(completed_jobs, key=lambda j: j.created_at)
last_backup = latest_job.created_at
# Check if enough time has passed
interval_hours = config.get('interval_hours', 24)
time_since_backup = datetime.now() - last_backup
return time_since_backup >= timedelta(hours=interval_hours)
# Main scheduling loop
print(f"Starting backup scheduler for {len(targets)} targets")
while True:
try:
for target in targets:
target_config = schedule_config.get(
target.name,
schedule_config.get('default', {})
)
if should_backup(target, target_config):
print(f"Creating scheduled backup for {target.name}")
backup_job = backup_driver.create_target_backup_job(
target=target,
extra={
'scheduled': True,
'retention_days': target_config.get('retention_days', 7)
}
)
print(f" Created job: {backup_job.id}")
# Clean up old backups if configured
if target_config.get('cleanup_old_backups', False):
cleanup_old_backups(backup_driver, target, target_config)
except Exception as e:
print(f"Error in backup scheduler: {e}")
# Wait before next check
sleep_minutes = schedule_config.get('check_interval_minutes', 60)
time.sleep(sleep_minutes * 60)
def cleanup_old_backups(backup_driver, target: BackupTarget, config: Dict):
"""Clean up old backup recovery points"""
retention_days = config.get('retention_days', 7)
cutoff_date = datetime.now() - timedelta(days=retention_days)
recovery_points = backup_driver.list_recovery_points(target)
for rp in recovery_points:
created_date = rp.extra.get('created_at')
if created_date and isinstance(created_date, datetime) and created_date < cutoff_date:
try:
success = backup_driver.delete_target(rp)
if success:
print(f" Cleaned up old backup: {rp.name}")
except Exception as e:
print(f" Failed to clean up {rp.name}: {e}")
# Usage example
schedule_config = {
'default': {
'interval_hours': 24, # Daily backups
'retention_days': 7, # Keep for 7 days
'cleanup_old_backups': True
},
'critical-db-backup': {
'interval_hours': 4, # Every 4 hours for critical systems
'retention_days': 30, # Keep for 30 days
'cleanup_old_backups': True
},
'check_interval_minutes': 60 # Check every hour
}
# Get targets to backup
all_targets = backup_driver.list_targets()
important_targets = [t for t in all_targets if 'prod' in t.name.lower()]
# Start scheduler (run in separate thread/process)
# create_backup_schedule(backup_driver, important_targets, schedule_config)from libcloud.backup.types import Provider as BackupProvider
from libcloud.backup.providers import get_driver as get_backup_driver
# Configure multiple backup providers for redundancy
backup_providers = {
'aws_ebs': {
'driver': get_backup_driver(BackupProvider.EBS),
'credentials': ('aws_access_key', 'aws_secret_key'),
'region': 'us-east-1'
},
'gce_snapshots': {
'driver': get_backup_driver(BackupProvider.GCE),
'credentials': ('service_account_email', 'key_file_path'),
'project': 'my-project'
}
}
# Initialize backup drivers
backup_drivers = {}
for name, config in backup_providers.items():
cls = config['driver']
if name == 'aws_ebs':
backup_drivers[name] = cls(*config['credentials'], region=config['region'])
elif name == 'gce_snapshots':
backup_drivers[name] = cls(*config['credentials'], project=config['project'])
def create_cross_provider_backup(compute_node, backup_name: str):
"""Create backups across multiple providers for redundancy"""
backup_results = {}
for provider_name, backup_driver in backup_drivers.items():
try:
print(f"Creating backup on {provider_name}...")
# Create backup target
target = backup_driver.create_target_from_node(
node=compute_node,
name=f'{backup_name}-{provider_name}'
)
# Create backup job
job = backup_driver.create_target_backup_job(
target=target,
extra={'cross_provider_backup': True}
)
backup_results[provider_name] = {
'target': target,
'job': job,
'status': 'initiated'
}
print(f" Backup initiated: {job.id}")
except Exception as e:
print(f" Failed to create backup on {provider_name}: {e}")
backup_results[provider_name] = {
'status': 'failed',
'error': str(e)
}
return backup_results
# Usage
node = compute_driver.list_nodes()[0] # Get a compute node
cross_provider_backups = create_cross_provider_backup(node, 'disaster-recovery-backup')import json
from datetime import datetime, timedelta
from typing import Dict, List
def generate_backup_report(backup_driver, targets: List[BackupTarget] = None) -> Dict:
"""Generate comprehensive backup report"""
if targets is None:
targets = backup_driver.list_targets()
report = {
'generated_at': datetime.now().isoformat(),
'total_targets': len(targets),
'target_summary': [],
'overall_stats': {
'healthy_targets': 0,
'targets_with_recent_backups': 0,
'failed_jobs_last_24h': 0,
'total_backup_size': 0
}
}
cutoff_24h = datetime.now() - timedelta(hours=24)
cutoff_7d = datetime.now() - timedelta(days=7)
for target in targets:
try:
# Get jobs for this target
jobs = backup_driver.list_target_jobs(target)
# Get recovery points
recovery_points = backup_driver.list_recovery_points(target)
# Analyze job status
recent_jobs = [j for j in jobs if j.created_at >= cutoff_24h]
failed_jobs_24h = [j for j in recent_jobs if j.status == 'failed']
successful_jobs = [j for j in jobs if j.status == 'completed']
# Find latest successful backup
latest_successful = None
if successful_jobs:
latest_successful = max(successful_jobs, key=lambda j: j.created_at)
# Determine target health
is_healthy = (
len(failed_jobs_24h) == 0 and
latest_successful is not None and
latest_successful.created_at >= cutoff_7d
)
target_info = {
'name': target.name,
'id': target.id,
'type': target.type,
'size_bytes': target.size,
'is_healthy': is_healthy,
'total_jobs': len(jobs),
'failed_jobs_24h': len(failed_jobs_24h),
'recovery_points_count': len(recovery_points),
'latest_backup': latest_successful.created_at.isoformat() if latest_successful else None,
'days_since_backup': (datetime.now() - latest_successful.created_at).days if latest_successful else None
}
report['target_summary'].append(target_info)
# Update overall stats
if is_healthy:
report['overall_stats']['healthy_targets'] += 1
if latest_successful and latest_successful.created_at >= cutoff_7d:
report['overall_stats']['targets_with_recent_backups'] += 1
report['overall_stats']['failed_jobs_last_24h'] += len(failed_jobs_24h)
report['overall_stats']['total_backup_size'] += target.size
except Exception as e:
print(f"Error analyzing target {target.name}: {e}")
target_info = {
'name': target.name,
'id': target.id,
'error': str(e),
'is_healthy': False
}
report['target_summary'].append(target_info)
return report
def monitor_backup_jobs(backup_driver, targets: List[BackupTarget], alert_callback=None):
"""Monitor backup job progress and alert on failures"""
active_jobs = {} # Track jobs we're monitoring
while True:
try:
for target in targets:
jobs = backup_driver.list_target_jobs(target)
for job in jobs:
if job.status in ['pending', 'running']:
# Track or update active job
if job.id not in active_jobs:
active_jobs[job.id] = {
'job': job,
'target': target,
'started_monitoring': datetime.now()
}
print(f"Started monitoring job {job.id} for {target.name}")
else:
# Update progress
old_progress = active_jobs[job.id]['job'].progress
if job.progress > old_progress:
print(f"Job {job.id} progress: {job.progress:.1%}")
active_jobs[job.id]['job'] = job
elif job.status in ['completed', 'failed', 'cancelled']:
# Job finished
if job.id in active_jobs:
duration = datetime.now() - active_jobs[job.id]['started_monitoring']
print(f"Job {job.id} finished: {job.status} (Duration: {duration})")
if job.status == 'failed' and alert_callback:
alert_callback(f"Backup job failed: {job.id} for target {target.name}")
del active_jobs[job.id]
# Check for stuck jobs
stuck_threshold = timedelta(hours=4)
current_time = datetime.now()
for job_id, job_info in list(active_jobs.items()):
monitoring_duration = current_time - job_info['started_monitoring']
if monitoring_duration > stuck_threshold:
print(f"WARNING: Job {job_id} appears stuck (running for {monitoring_duration})")
if alert_callback:
alert_callback(f"Backup job appears stuck: {job_id}")
except Exception as e:
print(f"Error monitoring backup jobs: {e}")
time.sleep(60) # Check every minute
# Usage examples
def backup_alert_handler(message: str):
"""Handle backup alerts (email, Slack, etc.)"""
print(f"ALERT: {message}")
# Implement your alerting mechanism here
# Generate backup report
backup_report = generate_backup_report(backup_driver)
# Save report to file
with open(f'backup_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json', 'w') as f:
json.dump(backup_report, f, indent=2)
# Print summary
print(f"Backup Report Summary:")
print(f" Total targets: {backup_report['total_targets']}")
print(f" Healthy targets: {backup_report['overall_stats']['healthy_targets']}")
print(f" Recent backups: {backup_report['overall_stats']['targets_with_recent_backups']}")
print(f" Failed jobs (24h): {backup_report['overall_stats']['failed_jobs_last_24h']}")
# Start monitoring (run in separate thread/process)
# all_targets = backup_driver.list_targets()
# monitor_backup_jobs(backup_driver, all_targets, backup_alert_handler)def create_disaster_recovery_plan(backup_driver, compute_driver, targets: List[BackupTarget]):
"""Create a disaster recovery plan with automated recovery procedures"""
dr_plan = {
'created_at': datetime.now().isoformat(),
'targets': [],
'recovery_procedures': []
}
for target in targets:
# Get recovery points
recovery_points = backup_driver.list_recovery_points(target)
if not recovery_points:
print(f"WARNING: No recovery points found for {target.name}")
continue
# Find best recovery point (most recent successful)
best_recovery_point = max(recovery_points, key=lambda rp: rp.extra.get('created_at', datetime.min))
target_dr_info = {
'target_id': target.id,
'target_name': target.name,
'target_type': target.type,
'best_recovery_point': best_recovery_point.id,
'recovery_point_date': best_recovery_point.extra.get('created_at'),
'estimated_recovery_time_minutes': estimate_recovery_time(target),
'recovery_priority': get_recovery_priority(target),
'dependencies': get_target_dependencies(target)
}
dr_plan['targets'].append(target_dr_info)
# Sort by priority (higher number = higher priority)
dr_plan['targets'].sort(key=lambda t: t['recovery_priority'], reverse=True)
# Generate recovery procedures
for i, target_info in enumerate(dr_plan['targets']):
procedure = {
'step': i + 1,
'target': target_info['target_name'],
'action': 'recover_target',
'parameters': {
'target_id': target_info['target_id'],
'recovery_point_id': target_info['best_recovery_point'],
'recovery_name': f"dr-{target_info['target_name']}-{datetime.now().strftime('%Y%m%d')}"
},
'estimated_duration_minutes': target_info['estimated_recovery_time_minutes'],
'dependencies': target_info['dependencies']
}
dr_plan['recovery_procedures'].append(procedure)
return dr_plan
def estimate_recovery_time(target: BackupTarget) -> int:
"""Estimate recovery time based on target size and type"""
size_gb = target.size / (1024 ** 3)
# Base time estimates (minutes)
base_times = {
'volume': 2, # 2 minutes per GB for volumes
'node': 5, # 5 minutes per GB for full nodes
'container': 1, # 1 minute per GB for containers
'database': 3 # 3 minutes per GB for databases
}
base_time = base_times.get(target.type, 3)
return max(int(size_gb * base_time), 15) # Minimum 15 minutes
def get_recovery_priority(target: BackupTarget) -> int:
"""Determine recovery priority (1-10, 10 being highest)"""
name_lower = target.name.lower()
if 'critical' in name_lower or 'prod' in name_lower:
return 10
elif 'important' in name_lower or 'web' in name_lower:
return 7
elif 'db' in name_lower or 'database' in name_lower:
return 9
elif 'test' in name_lower or 'dev' in name_lower:
return 3
else:
return 5
def get_target_dependencies(target: BackupTarget) -> List[str]:
"""Get list of dependencies for recovery ordering"""
# This would typically analyze the target's configuration
# For now, return basic dependencies based on naming
dependencies = []
name_lower = target.name.lower()
if 'web' in name_lower:
dependencies.extend(['database', 'cache'])
elif 'app' in name_lower:
dependencies.extend(['database'])
return dependencies
def execute_disaster_recovery(backup_driver, compute_driver, dr_plan: Dict):
"""Execute disaster recovery plan"""
print("Starting disaster recovery execution...")
print(f"Plan created: {dr_plan['created_at']}")
print(f"Total targets to recover: {len(dr_plan['targets'])}")
recovered_targets = {}
for procedure in dr_plan['recovery_procedures']:
step = procedure['step']
target_name = procedure['target']
print(f"\nStep {step}: Recovering {target_name}")
print(f"Estimated duration: {procedure['estimated_duration_minutes']} minutes")
# Check dependencies
dependencies = procedure['dependencies']
if dependencies:
print(f"Dependencies: {', '.join(dependencies)}")
for dep in dependencies:
if dep not in recovered_targets:
print(f"WARNING: Dependency {dep} not yet recovered")
try:
# Get target and recovery point
target = backup_driver.get_target(procedure['parameters']['target_id'])
recovery_points = backup_driver.list_recovery_points(target)
recovery_point = next(
rp for rp in recovery_points
if rp.id == procedure['parameters']['recovery_point_id']
)
# Execute recovery
print(f"Recovering from point: {recovery_point.name}")
recovered_node = backup_driver.recover_target_out_of_place(
target=target,
recovery_point=recovery_point,
recovery_target_name=procedure['parameters']['recovery_name']
)
recovered_targets[target_name] = recovered_node
print(f"✓ Recovery completed: {recovered_node.name} ({recovered_node.id})")
except Exception as e:
print(f"✗ Recovery failed for {target_name}: {e}")
# Log failure and continue with next target
print(f"\nDisaster recovery completed. Recovered {len(recovered_targets)} targets.")
return recovered_targets
# Usage
all_targets = backup_driver.list_targets()
critical_targets = [t for t in all_targets if 'prod' in t.name.lower() or 'critical' in t.name.lower()]
# Create DR plan
dr_plan = create_disaster_recovery_plan(backup_driver, compute_driver, critical_targets)
# Save DR plan
with open(f'disaster_recovery_plan_{datetime.now().strftime("%Y%m%d")}.json', 'w') as f:
json.dump(dr_plan, f, indent=2)
print("Disaster Recovery Plan created:")
for procedure in dr_plan['recovery_procedures']:
print(f" Step {procedure['step']}: {procedure['target']} ({procedure['estimated_duration_minutes']} min)")
# Execute DR plan (only in actual disaster scenario)
# recovered_nodes = execute_disaster_recovery(backup_driver, compute_driver, dr_plan)from libcloud.backup.types import BackupError
from libcloud.common.types import LibcloudError, InvalidCredsError
try:
# Create backup target
target = backup_driver.create_target_from_node(node, name='test-backup')
# Create backup job
job = backup_driver.create_target_backup_job(target)
except InvalidCredsError:
print("Invalid credentials for backup provider")
except BackupError as e:
print(f"Backup specific error: {e}")
except LibcloudError as e:
print(f"General Libcloud error: {e}")
# Check job status before operations
if job.status == 'completed':
recovery_points = backup_driver.list_recovery_points(target)
elif job.status == 'failed':
print(f"Backup job failed: {job.extra.get('error_message', 'Unknown error')}")Different providers offer additional features through the ex_* parameter pattern:
# AWS EBS specific features
ebs_driver = get_driver(Provider.EBS)('access_key', 'secret_key', region='us-east-1')
# Create backup with EBS-specific options
ebs_backup = ebs_driver.create_target_backup_job(
target=target,
extra={
'description': 'Daily automated backup',
'encrypted': True, # Encrypt snapshot
'copy_tags': True, # Copy tags from source volume
'kms_key_id': 'arn:aws:kms:...', # Custom KMS key
}
)
# List available backup locations
backup_locations = ebs_driver.ex_list_available_backup_locations()
for location in backup_locations:
print(f"Backup location: {location['name']} ({location['region']})")
# Google Cloud specific features
gce_driver = get_driver(Provider.GCE)('email', 'key_file', project='my-project')
# Create backup with GCE-specific options
gce_backup = gce_driver.create_target_backup_job(
target=target,
extra={
'storage_location': 'us-central1', # Regional storage
'labels': {'environment': 'production', 'team': 'devops'}
}
)Check provider-specific documentation for additional capabilities available through the ex_* parameters and methods.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-libcloud