CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-libcloud

A standard Python library that abstracts away differences among multiple cloud provider APIs

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

backup-services.mddocs/

Backup Services

The backup service provides a unified interface for backup and snapshot management across multiple cloud backup providers including AWS EBS Snapshots, Google Persistent Disk Snapshots, Azure Disk Snapshots, and other backup-as-a-service providers.

Providers

from libcloud.backup.types import Provider

class Provider:
    """Enumeration of supported backup providers"""
    EBS = 'ebs'                         # AWS EBS Snapshots
    GCE = 'gce'                         # Google Persistent Disk Snapshots  
    AZURE_ARM = 'azure_arm'             # Azure Resource Manager Snapshots
    DIMENSIONDATA = 'dimensiondata'      # Dimension Data Backup
    CLOUDSTACK = 'cloudstack'           # CloudStack Snapshots
    # ... more providers

Driver Factory

from libcloud.backup.providers import get_driver

def get_driver(provider: Provider) -> type[BackupDriver]

Get the driver class for a specific backup provider.

Parameters:

  • provider: Provider identifier from the Provider enum

Returns:

  • Driver class for the specified provider

Example:

from libcloud.backup.types import Provider
from libcloud.backup.providers import get_driver

# Get AWS EBS backup driver class
cls = get_driver(Provider.EBS)

# Initialize driver with credentials
driver = cls('access_key', 'secret_key', region='us-east-1')

Core Classes

BackupDriver

class BackupDriver(BaseDriver):
    """Base class for all backup drivers"""
    
    def list_targets(self) -> List[BackupTarget]
    def get_target(self, target_id: str) -> BackupTarget
    def create_target_from_node(self, node: Node, name: str = None, ex_use_tags: bool = True) -> BackupTarget
    def create_target_from_container(self, container: Container, name: str = None) -> BackupTarget
    def update_target(self, target: BackupTarget, name: str = None, extra: Dict = None) -> BackupTarget
    def delete_target(self, target: BackupTarget) -> bool
    def list_recovery_points(self, target: BackupTarget, start_date: datetime = None, end_date: datetime = None) -> List[BackupTarget]
    def recover_target(self, target: BackupTarget, recovery_point: BackupTarget, recovery_target_name: str = None) -> Node
    def recover_target_out_of_place(self, target: BackupTarget, recovery_point: BackupTarget, recovery_target_name: str = None, **kwargs) -> Node
    def create_target_backup_job(self, target: BackupTarget, extra: Dict = None) -> BackupTargetJob
    def list_target_jobs(self, target: BackupTarget) -> List[BackupTargetJob]
    def ex_list_available_backup_locations(self) -> List[Dict]

Base class that all backup drivers inherit from. Provides methods for managing backup targets, recovery points, and backup jobs.

Key Methods:

  • list_targets(): List all backup targets
  • create_target_from_node(): Create backup target from compute node
  • list_recovery_points(): List available recovery points for a target
  • recover_target(): Restore a target from a recovery point
  • create_target_backup_job(): Create a backup job
  • delete_target(): Delete a backup target

BackupTarget

class BackupTarget:
    """Represents a backup target"""
    
    id: str
    name: str
    address: str
    type: BackupTargetType
    size: int
    driver: BackupDriver
    extra: Dict[str, Any]
    
    def list_recovery_points(self, start_date: datetime = None, end_date: datetime = None) -> List[BackupTarget]
    def recover(self, recovery_point: BackupTarget, recovery_target_name: str = None) -> Node
    def backup(self, name: str = None) -> BackupTargetJob
    def delete(self) -> bool

Represents a backup target (source for backups like a disk, volume, or node).

Properties:

  • id: Unique backup target identifier
  • name: Human-readable name
  • address: Target address/identifier (volume ID, node ID, etc.)
  • type: Type of backup target (volume, node, etc.)
  • size: Size in bytes of the backup target
  • extra: Provider-specific metadata

Methods:

  • list_recovery_points(): List recovery points for this target
  • recover(): Restore this target from a recovery point
  • backup(): Create a backup of this target
  • delete(): Delete this backup target

BackupTargetJob

class BackupTargetJob:
    """Represents a backup job"""
    
    id: str
    target_id: str
    status: BackupTargetJobStatusType
    progress: float
    created_at: datetime
    driver: BackupDriver
    extra: Dict[str, Any]

Represents a backup job/operation.

Properties:

  • id: Unique job identifier
  • target_id: ID of the backup target
  • status: Current job status (running, completed, failed, etc.)
  • progress: Job progress as percentage (0.0 to 1.0)
  • created_at: Job creation timestamp
  • extra: Provider-specific job metadata

BackupTargetType

class BackupTargetType:
    """Backup target types enumeration"""
    VOLUME = 'volume'
    NODE = 'node'
    CONTAINER = 'container'
    FILE_SYSTEM = 'file_system'
    DATABASE = 'database'
    VIRTUAL_MACHINE = 'virtual_machine'

Enumeration of supported backup target types.

BackupTargetJobStatusType

class BackupTargetJobStatusType:
    """Backup job status types enumeration"""
    PENDING = 'pending'
    RUNNING = 'running'
    COMPLETED = 'completed'
    FAILED = 'failed'
    CANCELLED = 'cancelled'
    UNKNOWN = 'unknown'

Enumeration of possible backup job statuses.

Usage Examples

Basic Backup Target Management

from libcloud.backup.types import Provider, BackupTargetType
from libcloud.backup.providers import get_driver
from libcloud.compute.types import Provider as ComputeProvider
from libcloud.compute.providers import get_driver as get_compute_driver

# Initialize backup driver (AWS EBS example)
backup_cls = get_driver(Provider.EBS)
backup_driver = backup_cls('access_key', 'secret_key', region='us-east-1')

# Initialize compute driver to get nodes
compute_cls = get_compute_driver(ComputeProvider.EC2)
compute_driver = compute_cls('access_key', 'secret_key', region='us-east-1')

# List existing backup targets
targets = backup_driver.list_targets()
print(f"Existing backup targets: {len(targets)}")

for target in targets:
    print(f"Target: {target.name} (Type: {target.type}, Size: {target.size} bytes)")
    print(f"  Address: {target.address}")
    print(f"  Created: {target.extra.get('created_at', 'unknown')}")

# Create backup target from a compute node
nodes = compute_driver.list_nodes()
if nodes:
    node = nodes[0]  # Use first node
    backup_target = backup_driver.create_target_from_node(
        node=node,
        name=f'backup-{node.name}',
        ex_use_tags=True
    )
    print(f"Created backup target: {backup_target.name} ({backup_target.id})")

Backup Creation and Management

# Get a backup target
target = backup_driver.get_target('backup-target-123')
print(f"Backup target: {target.name}")

# Create a backup job
backup_job = backup_driver.create_target_backup_job(
    target=target,
    extra={'description': 'Daily backup', 'retention_days': 30}
)
print(f"Created backup job: {backup_job.id} (Status: {backup_job.status})")

# Alternative: Create backup using target method
backup_job2 = target.backup(name='manual-backup-2023-10-15')
print(f"Created backup via target: {backup_job2.id}")

# List all backup jobs for a target
jobs = backup_driver.list_target_jobs(target)
print(f"Backup jobs for {target.name}: {len(jobs)}")

for job in jobs:
    print(f"  Job {job.id}: {job.status} ({job.progress:.1%} complete)")
    print(f"    Created: {job.created_at}")
    if job.extra:
        print(f"    Extra: {job.extra}")

Recovery Point Management

from datetime import datetime, timedelta

# List recovery points for a target
recovery_points = backup_driver.list_recovery_points(target)
print(f"Available recovery points: {len(recovery_points)}")

for rp in recovery_points:
    print(f"Recovery Point: {rp.name} (Created: {rp.extra.get('created_at')})")
    print(f"  Size: {rp.size} bytes")

# List recovery points within a date range
end_date = datetime.now()
start_date = end_date - timedelta(days=7)  # Last 7 days

recent_recovery_points = backup_driver.list_recovery_points(
    target, 
    start_date=start_date, 
    end_date=end_date
)
print(f"Recovery points from last 7 days: {len(recent_recovery_points)}")

# Alternative: List using target method
target_recovery_points = target.list_recovery_points(start_date=start_date)
print(f"Target recovery points: {len(target_recovery_points)}")

Backup Recovery and Restoration

# Recover target in place (restore to original location)
if recovery_points:
    latest_recovery_point = recovery_points[0]  # Assuming sorted by date
    
    print(f"Restoring {target.name} from recovery point {latest_recovery_point.name}")
    restored_node = backup_driver.recover_target(
        target=target,
        recovery_point=latest_recovery_point,
        recovery_target_name=f'restored-{target.name}'
    )
    print(f"Restored node: {restored_node.name} ({restored_node.id})")

# Recover target out of place (restore to new location/instance)
if recovery_points:
    recovery_point = recovery_points[0]
    
    restored_node = backup_driver.recover_target_out_of_place(
        target=target,
        recovery_point=recovery_point,
        recovery_target_name='disaster-recovery-instance',
        ex_instance_type='t3.medium',  # Different instance type
        ex_subnet_id='subnet-new-123',  # Different subnet
        ex_security_groups=['sg-disaster-recovery']
    )
    print(f"Out-of-place recovery completed: {restored_node.name}")

# Alternative: Recover using target method
if recovery_points:
    restored_via_target = target.recover(
        recovery_point=recovery_points[0],
        recovery_target_name='target-method-recovery'
    )
    print(f"Recovered via target method: {restored_via_target.name}")

Automated Backup Scheduling

import time
from datetime import datetime, timedelta
from typing import List, Dict

def create_backup_schedule(backup_driver, targets: List[BackupTarget], schedule_config: Dict):
    """Create automated backup schedule"""
    
    def should_backup(target: BackupTarget, config: Dict) -> bool:
        """Check if target should be backed up based on schedule"""
        
        # Get last backup time
        jobs = backup_driver.list_target_jobs(target)
        completed_jobs = [j for j in jobs if j.status == 'completed']
        
        if not completed_jobs:
            return True  # No backups yet
        
        # Sort by creation time and get latest
        latest_job = max(completed_jobs, key=lambda j: j.created_at)
        last_backup = latest_job.created_at
        
        # Check if enough time has passed
        interval_hours = config.get('interval_hours', 24)
        time_since_backup = datetime.now() - last_backup
        
        return time_since_backup >= timedelta(hours=interval_hours)
    
    # Main scheduling loop
    print(f"Starting backup scheduler for {len(targets)} targets")
    
    while True:
        try:
            for target in targets:
                target_config = schedule_config.get(
                    target.name, 
                    schedule_config.get('default', {})
                )
                
                if should_backup(target, target_config):
                    print(f"Creating scheduled backup for {target.name}")
                    
                    backup_job = backup_driver.create_target_backup_job(
                        target=target,
                        extra={
                            'scheduled': True,
                            'retention_days': target_config.get('retention_days', 7)
                        }
                    )
                    print(f"  Created job: {backup_job.id}")
                
                # Clean up old backups if configured
                if target_config.get('cleanup_old_backups', False):
                    cleanup_old_backups(backup_driver, target, target_config)
        
        except Exception as e:
            print(f"Error in backup scheduler: {e}")
        
        # Wait before next check
        sleep_minutes = schedule_config.get('check_interval_minutes', 60)
        time.sleep(sleep_minutes * 60)

def cleanup_old_backups(backup_driver, target: BackupTarget, config: Dict):
    """Clean up old backup recovery points"""
    
    retention_days = config.get('retention_days', 7)
    cutoff_date = datetime.now() - timedelta(days=retention_days)
    
    recovery_points = backup_driver.list_recovery_points(target)
    
    for rp in recovery_points:
        created_date = rp.extra.get('created_at')
        if created_date and isinstance(created_date, datetime) and created_date < cutoff_date:
            try:
                success = backup_driver.delete_target(rp)
                if success:
                    print(f"  Cleaned up old backup: {rp.name}")
            except Exception as e:
                print(f"  Failed to clean up {rp.name}: {e}")

# Usage example
schedule_config = {
    'default': {
        'interval_hours': 24,       # Daily backups
        'retention_days': 7,        # Keep for 7 days
        'cleanup_old_backups': True
    },
    'critical-db-backup': {
        'interval_hours': 4,        # Every 4 hours for critical systems
        'retention_days': 30,       # Keep for 30 days
        'cleanup_old_backups': True
    },
    'check_interval_minutes': 60    # Check every hour
}

# Get targets to backup
all_targets = backup_driver.list_targets()
important_targets = [t for t in all_targets if 'prod' in t.name.lower()]

# Start scheduler (run in separate thread/process)
# create_backup_schedule(backup_driver, important_targets, schedule_config)

Cross-Provider Backup Strategy

from libcloud.backup.types import Provider as BackupProvider
from libcloud.backup.providers import get_driver as get_backup_driver

# Configure multiple backup providers for redundancy
backup_providers = {
    'aws_ebs': {
        'driver': get_backup_driver(BackupProvider.EBS),
        'credentials': ('aws_access_key', 'aws_secret_key'),
        'region': 'us-east-1'
    },
    'gce_snapshots': {
        'driver': get_backup_driver(BackupProvider.GCE),
        'credentials': ('service_account_email', 'key_file_path'),
        'project': 'my-project'
    }
}

# Initialize backup drivers
backup_drivers = {}
for name, config in backup_providers.items():
    cls = config['driver']
    if name == 'aws_ebs':
        backup_drivers[name] = cls(*config['credentials'], region=config['region'])
    elif name == 'gce_snapshots':
        backup_drivers[name] = cls(*config['credentials'], project=config['project'])

def create_cross_provider_backup(compute_node, backup_name: str):
    """Create backups across multiple providers for redundancy"""
    
    backup_results = {}
    
    for provider_name, backup_driver in backup_drivers.items():
        try:
            print(f"Creating backup on {provider_name}...")
            
            # Create backup target
            target = backup_driver.create_target_from_node(
                node=compute_node,
                name=f'{backup_name}-{provider_name}'
            )
            
            # Create backup job
            job = backup_driver.create_target_backup_job(
                target=target,
                extra={'cross_provider_backup': True}
            )
            
            backup_results[provider_name] = {
                'target': target,
                'job': job,
                'status': 'initiated'
            }
            
            print(f"  Backup initiated: {job.id}")
            
        except Exception as e:
            print(f"  Failed to create backup on {provider_name}: {e}")
            backup_results[provider_name] = {
                'status': 'failed',
                'error': str(e)
            }
    
    return backup_results

# Usage
node = compute_driver.list_nodes()[0]  # Get a compute node
cross_provider_backups = create_cross_provider_backup(node, 'disaster-recovery-backup')

Backup Monitoring and Reporting

import json
from datetime import datetime, timedelta
from typing import Dict, List

def generate_backup_report(backup_driver, targets: List[BackupTarget] = None) -> Dict:
    """Generate comprehensive backup report"""
    
    if targets is None:
        targets = backup_driver.list_targets()
    
    report = {
        'generated_at': datetime.now().isoformat(),
        'total_targets': len(targets),
        'target_summary': [],
        'overall_stats': {
            'healthy_targets': 0,
            'targets_with_recent_backups': 0,
            'failed_jobs_last_24h': 0,
            'total_backup_size': 0
        }
    }
    
    cutoff_24h = datetime.now() - timedelta(hours=24)
    cutoff_7d = datetime.now() - timedelta(days=7)
    
    for target in targets:
        try:
            # Get jobs for this target
            jobs = backup_driver.list_target_jobs(target)
            
            # Get recovery points
            recovery_points = backup_driver.list_recovery_points(target)
            
            # Analyze job status
            recent_jobs = [j for j in jobs if j.created_at >= cutoff_24h]
            failed_jobs_24h = [j for j in recent_jobs if j.status == 'failed']
            successful_jobs = [j for j in jobs if j.status == 'completed']
            
            # Find latest successful backup
            latest_successful = None
            if successful_jobs:
                latest_successful = max(successful_jobs, key=lambda j: j.created_at)
            
            # Determine target health
            is_healthy = (
                len(failed_jobs_24h) == 0 and
                latest_successful is not None and
                latest_successful.created_at >= cutoff_7d
            )
            
            target_info = {
                'name': target.name,
                'id': target.id,
                'type': target.type,
                'size_bytes': target.size,
                'is_healthy': is_healthy,
                'total_jobs': len(jobs),
                'failed_jobs_24h': len(failed_jobs_24h),
                'recovery_points_count': len(recovery_points),
                'latest_backup': latest_successful.created_at.isoformat() if latest_successful else None,
                'days_since_backup': (datetime.now() - latest_successful.created_at).days if latest_successful else None
            }
            
            report['target_summary'].append(target_info)
            
            # Update overall stats
            if is_healthy:
                report['overall_stats']['healthy_targets'] += 1
            
            if latest_successful and latest_successful.created_at >= cutoff_7d:
                report['overall_stats']['targets_with_recent_backups'] += 1
            
            report['overall_stats']['failed_jobs_last_24h'] += len(failed_jobs_24h)
            report['overall_stats']['total_backup_size'] += target.size
            
        except Exception as e:
            print(f"Error analyzing target {target.name}: {e}")
            target_info = {
                'name': target.name,
                'id': target.id,
                'error': str(e),
                'is_healthy': False
            }
            report['target_summary'].append(target_info)
    
    return report

def monitor_backup_jobs(backup_driver, targets: List[BackupTarget], alert_callback=None):
    """Monitor backup job progress and alert on failures"""
    
    active_jobs = {}  # Track jobs we're monitoring
    
    while True:
        try:
            for target in targets:
                jobs = backup_driver.list_target_jobs(target)
                
                for job in jobs:
                    if job.status in ['pending', 'running']:
                        # Track or update active job
                        if job.id not in active_jobs:
                            active_jobs[job.id] = {
                                'job': job,
                                'target': target,
                                'started_monitoring': datetime.now()
                            }
                            print(f"Started monitoring job {job.id} for {target.name}")
                        else:
                            # Update progress
                            old_progress = active_jobs[job.id]['job'].progress
                            if job.progress > old_progress:
                                print(f"Job {job.id} progress: {job.progress:.1%}")
                            active_jobs[job.id]['job'] = job
                    
                    elif job.status in ['completed', 'failed', 'cancelled']:
                        # Job finished
                        if job.id in active_jobs:
                            duration = datetime.now() - active_jobs[job.id]['started_monitoring']
                            print(f"Job {job.id} finished: {job.status} (Duration: {duration})")
                            
                            if job.status == 'failed' and alert_callback:
                                alert_callback(f"Backup job failed: {job.id} for target {target.name}")
                            
                            del active_jobs[job.id]
            
            # Check for stuck jobs
            stuck_threshold = timedelta(hours=4)
            current_time = datetime.now()
            
            for job_id, job_info in list(active_jobs.items()):
                monitoring_duration = current_time - job_info['started_monitoring']
                if monitoring_duration > stuck_threshold:
                    print(f"WARNING: Job {job_id} appears stuck (running for {monitoring_duration})")
                    if alert_callback:
                        alert_callback(f"Backup job appears stuck: {job_id}")
        
        except Exception as e:
            print(f"Error monitoring backup jobs: {e}")
        
        time.sleep(60)  # Check every minute

# Usage examples
def backup_alert_handler(message: str):
    """Handle backup alerts (email, Slack, etc.)"""
    print(f"ALERT: {message}")
    # Implement your alerting mechanism here

# Generate backup report
backup_report = generate_backup_report(backup_driver)

# Save report to file
with open(f'backup_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json', 'w') as f:
    json.dump(backup_report, f, indent=2)

# Print summary
print(f"Backup Report Summary:")
print(f"  Total targets: {backup_report['total_targets']}")
print(f"  Healthy targets: {backup_report['overall_stats']['healthy_targets']}")
print(f"  Recent backups: {backup_report['overall_stats']['targets_with_recent_backups']}")
print(f"  Failed jobs (24h): {backup_report['overall_stats']['failed_jobs_last_24h']}")

# Start monitoring (run in separate thread/process)
# all_targets = backup_driver.list_targets()
# monitor_backup_jobs(backup_driver, all_targets, backup_alert_handler)

Disaster Recovery Planning

def create_disaster_recovery_plan(backup_driver, compute_driver, targets: List[BackupTarget]):
    """Create a disaster recovery plan with automated recovery procedures"""
    
    dr_plan = {
        'created_at': datetime.now().isoformat(),
        'targets': [],
        'recovery_procedures': []
    }
    
    for target in targets:
        # Get recovery points
        recovery_points = backup_driver.list_recovery_points(target)
        
        if not recovery_points:
            print(f"WARNING: No recovery points found for {target.name}")
            continue
        
        # Find best recovery point (most recent successful)
        best_recovery_point = max(recovery_points, key=lambda rp: rp.extra.get('created_at', datetime.min))
        
        target_dr_info = {
            'target_id': target.id,
            'target_name': target.name,
            'target_type': target.type,
            'best_recovery_point': best_recovery_point.id,
            'recovery_point_date': best_recovery_point.extra.get('created_at'),
            'estimated_recovery_time_minutes': estimate_recovery_time(target),
            'recovery_priority': get_recovery_priority(target),
            'dependencies': get_target_dependencies(target)
        }
        
        dr_plan['targets'].append(target_dr_info)
    
    # Sort by priority (higher number = higher priority)
    dr_plan['targets'].sort(key=lambda t: t['recovery_priority'], reverse=True)
    
    # Generate recovery procedures
    for i, target_info in enumerate(dr_plan['targets']):
        procedure = {
            'step': i + 1,
            'target': target_info['target_name'],
            'action': 'recover_target',
            'parameters': {
                'target_id': target_info['target_id'],
                'recovery_point_id': target_info['best_recovery_point'],
                'recovery_name': f"dr-{target_info['target_name']}-{datetime.now().strftime('%Y%m%d')}"
            },
            'estimated_duration_minutes': target_info['estimated_recovery_time_minutes'],
            'dependencies': target_info['dependencies']
        }
        dr_plan['recovery_procedures'].append(procedure)
    
    return dr_plan

def estimate_recovery_time(target: BackupTarget) -> int:
    """Estimate recovery time based on target size and type"""
    size_gb = target.size / (1024 ** 3)
    
    # Base time estimates (minutes)
    base_times = {
        'volume': 2,      # 2 minutes per GB for volumes
        'node': 5,        # 5 minutes per GB for full nodes
        'container': 1,   # 1 minute per GB for containers
        'database': 3     # 3 minutes per GB for databases
    }
    
    base_time = base_times.get(target.type, 3)
    return max(int(size_gb * base_time), 15)  # Minimum 15 minutes

def get_recovery_priority(target: BackupTarget) -> int:
    """Determine recovery priority (1-10, 10 being highest)"""
    name_lower = target.name.lower()
    
    if 'critical' in name_lower or 'prod' in name_lower:
        return 10
    elif 'important' in name_lower or 'web' in name_lower:
        return 7
    elif 'db' in name_lower or 'database' in name_lower:
        return 9
    elif 'test' in name_lower or 'dev' in name_lower:
        return 3
    else:
        return 5

def get_target_dependencies(target: BackupTarget) -> List[str]:
    """Get list of dependencies for recovery ordering"""
    # This would typically analyze the target's configuration
    # For now, return basic dependencies based on naming
    dependencies = []
    name_lower = target.name.lower()
    
    if 'web' in name_lower:
        dependencies.extend(['database', 'cache'])
    elif 'app' in name_lower:
        dependencies.extend(['database'])
    
    return dependencies

def execute_disaster_recovery(backup_driver, compute_driver, dr_plan: Dict):
    """Execute disaster recovery plan"""
    
    print("Starting disaster recovery execution...")
    print(f"Plan created: {dr_plan['created_at']}")
    print(f"Total targets to recover: {len(dr_plan['targets'])}")
    
    recovered_targets = {}
    
    for procedure in dr_plan['recovery_procedures']:
        step = procedure['step']
        target_name = procedure['target']
        
        print(f"\nStep {step}: Recovering {target_name}")
        print(f"Estimated duration: {procedure['estimated_duration_minutes']} minutes")
        
        # Check dependencies
        dependencies = procedure['dependencies']
        if dependencies:
            print(f"Dependencies: {', '.join(dependencies)}")
            for dep in dependencies:
                if dep not in recovered_targets:
                    print(f"WARNING: Dependency {dep} not yet recovered")
        
        try:
            # Get target and recovery point
            target = backup_driver.get_target(procedure['parameters']['target_id'])
            recovery_points = backup_driver.list_recovery_points(target)
            recovery_point = next(
                rp for rp in recovery_points 
                if rp.id == procedure['parameters']['recovery_point_id']
            )
            
            # Execute recovery
            print(f"Recovering from point: {recovery_point.name}")
            recovered_node = backup_driver.recover_target_out_of_place(
                target=target,
                recovery_point=recovery_point,
                recovery_target_name=procedure['parameters']['recovery_name']
            )
            
            recovered_targets[target_name] = recovered_node
            print(f"✓ Recovery completed: {recovered_node.name} ({recovered_node.id})")
            
        except Exception as e:
            print(f"✗ Recovery failed for {target_name}: {e}")
            # Log failure and continue with next target
    
    print(f"\nDisaster recovery completed. Recovered {len(recovered_targets)} targets.")
    return recovered_targets

# Usage
all_targets = backup_driver.list_targets()
critical_targets = [t for t in all_targets if 'prod' in t.name.lower() or 'critical' in t.name.lower()]

# Create DR plan
dr_plan = create_disaster_recovery_plan(backup_driver, compute_driver, critical_targets)

# Save DR plan
with open(f'disaster_recovery_plan_{datetime.now().strftime("%Y%m%d")}.json', 'w') as f:
    json.dump(dr_plan, f, indent=2)

print("Disaster Recovery Plan created:")
for procedure in dr_plan['recovery_procedures']:
    print(f"  Step {procedure['step']}: {procedure['target']} ({procedure['estimated_duration_minutes']} min)")

# Execute DR plan (only in actual disaster scenario)
# recovered_nodes = execute_disaster_recovery(backup_driver, compute_driver, dr_plan)

Exception Handling

from libcloud.backup.types import BackupError
from libcloud.common.types import LibcloudError, InvalidCredsError

try:
    # Create backup target
    target = backup_driver.create_target_from_node(node, name='test-backup')
    
    # Create backup job
    job = backup_driver.create_target_backup_job(target)
    
except InvalidCredsError:
    print("Invalid credentials for backup provider")
except BackupError as e:
    print(f"Backup specific error: {e}")
except LibcloudError as e:
    print(f"General Libcloud error: {e}")

# Check job status before operations
if job.status == 'completed':
    recovery_points = backup_driver.list_recovery_points(target)
elif job.status == 'failed':
    print(f"Backup job failed: {job.extra.get('error_message', 'Unknown error')}")

Provider-Specific Features

Different providers offer additional features through the ex_* parameter pattern:

# AWS EBS specific features
ebs_driver = get_driver(Provider.EBS)('access_key', 'secret_key', region='us-east-1')

# Create backup with EBS-specific options
ebs_backup = ebs_driver.create_target_backup_job(
    target=target,
    extra={
        'description': 'Daily automated backup',
        'encrypted': True,              # Encrypt snapshot
        'copy_tags': True,             # Copy tags from source volume
        'kms_key_id': 'arn:aws:kms:...', # Custom KMS key
    }
)

# List available backup locations
backup_locations = ebs_driver.ex_list_available_backup_locations()
for location in backup_locations:
    print(f"Backup location: {location['name']} ({location['region']})")

# Google Cloud specific features
gce_driver = get_driver(Provider.GCE)('email', 'key_file', project='my-project')

# Create backup with GCE-specific options
gce_backup = gce_driver.create_target_backup_job(
    target=target,
    extra={
        'storage_location': 'us-central1',  # Regional storage
        'labels': {'environment': 'production', 'team': 'devops'}
    }
)

Check provider-specific documentation for additional capabilities available through the ex_* parameters and methods.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-libcloud

docs

backup-services.md

compute-services.md

container-services.md

core-driver-system.md

dns-management.md

index.md

load-balancer-services.md

storage-services.md

tile.json