Microsoft Azure Recovery Services Backup Management Client Library for Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive backup and restore job lifecycle management including job monitoring, cancellation, result retrieval, and detailed status tracking across all supported workload types. Provides complete operational visibility and control over backup operations with extensive filtering and export capabilities.
Core operations for managing backup and restore jobs with comprehensive tracking and control.
class BackupJobsOperations:
def list(self, resource_group_name: str, vault_name: str, **kwargs) -> Iterable[JobResource]:
"""
List jobs in a Recovery Services vault.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- kwargs: Filter options (status, operation, start_time, end_time, backup_management_type, etc.)
Returns:
Iterable of JobResource objects
"""
class JobDetailsOperations:
def get(self, resource_group_name: str, vault_name: str, job_name: str, **kwargs) -> JobResource:
"""
Get details of a specific job.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- job_name: Name/ID of the job
- kwargs: Additional options
Returns:
JobResource with detailed job information
"""
class JobCancellationsOperations:
def trigger(self, resource_group_name: str, vault_name: str, job_name: str, **kwargs) -> None:
"""
Cancel a running job.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- job_name: Name/ID of the job to cancel
- kwargs: Additional options
"""
class JobsOperations:
def export(self, resource_group_name: str, vault_name: str, **kwargs) -> None:
"""
Export jobs to a storage account for analysis.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- kwargs: Export parameters (storage account, filters, date range, etc.)
"""Usage example:
from datetime import datetime, timedelta
# List recent backup jobs
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
jobs = client.backup_jobs.list(
"my-rg", "my-vault",
filter=f"startTime eq '{start_time.isoformat()}' and endTime eq '{end_time.isoformat()}' and operation eq 'Backup'"
)
for job in jobs:
print(f"Job: {job.name}")
print(f"Operation: {job.properties.operation}")
print(f"Status: {job.properties.status}")
print(f"Entity: {job.properties.entity_friendly_name}")
print(f"Duration: {job.properties.duration}")
print("---")
# Get specific job details
job_details = client.job_details.get("my-rg", "my-vault", "job-id-12345")
print(f"Job Status: {job_details.properties.status}")
print(f"Start Time: {job_details.properties.start_time}")
print(f"End Time: {job_details.properties.end_time}")
if job_details.properties.error_details:
for error in job_details.properties.error_details:
print(f"Error: {error.error_code} - {error.error_string}")Extended job operations with detailed information and operation result tracking.
class JobDetailsOperations:
def get(self, resource_group_name: str, vault_name: str, job_name: str, **kwargs) -> JobResource:
"""
Get comprehensive job details including progress and sub-tasks.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- job_name: Name/ID of the job
- kwargs: Additional options
Returns:
JobResource with comprehensive job details
"""
class JobOperationResultsOperations:
def get(
self,
resource_group_name: str,
vault_name: str,
job_name: str,
operation_id: str,
**kwargs
) -> None:
"""
Get result of a job operation.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- job_name: Name/ID of the job
- operation_id: ID of the operation
- kwargs: Additional options
"""
class JobCancellationsOperations:
def trigger(self, resource_group_name: str, vault_name: str, job_name: str, **kwargs) -> None:
"""
Trigger job cancellation with tracking.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- job_name: Name/ID of the job to cancel
- kwargs: Additional options
"""
class ExportJobsOperationResultsOperations:
def get(
self,
resource_group_name: str,
vault_name: str,
operation_id: str,
**kwargs
) -> OperationResultInfoBaseResource:
"""
Get result of job export operation.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- operation_id: ID of the export operation
- kwargs: Additional options
Returns:
OperationResultInfoBaseResource with export results
"""Usage example:
# Get detailed job information with progress
job_details = client.job_details.get("my-rg", "my-vault", "job-id-12345")
if hasattr(job_details.properties, 'extended_info'):
extended_info = job_details.properties.extended_info
if hasattr(extended_info, 'progress_percentage'):
print(f"Progress: {extended_info.progress_percentage}%")
if hasattr(extended_info, 'task_list'):
for task in extended_info.task_list:
print(f"Task: {task.task_id} - {task.status}")
# Cancel a running job
client.job_cancellations.trigger("my-rg", "my-vault", "job-id-12345")
print("Job cancellation triggered")Specialized operations for backup-specific jobs with enhanced filtering and management.
class BackupJobsOperations:
def list(self, resource_group_name: str, vault_name: str, **kwargs) -> Iterable[JobResource]:
"""
List backup jobs with backup-specific filtering.
Parameters:
- resource_group_name: Resource group containing the vault
- vault_name: Recovery Services vault name
- kwargs: Backup-specific filter options
Returns:
Iterable of JobResource objects for backup operations
"""
def get(self, resource_group_name: str, vault_name: str, job_name: str, **kwargs) -> JobResource:
"""Get backup job with backup-specific details."""
def cancel(self, resource_group_name: str, vault_name: str, job_name: str, **kwargs) -> None:
"""Cancel backup job with backup-specific validation."""The job list operations support comprehensive filtering using OData query syntax.
# Filter parameters for job queries
class JobFilters:
# Status filters
status: str # "InProgress", "Completed", "Failed", "CompletedWithWarnings", "Cancelled", "Cancelling"
# Operation filters
operation: str # "Backup", "Restore", "ConfigureBackup", "DisableBackup", "DeleteBackupData"
# Time range filters
start_time: datetime # Start time for job query
end_time: datetime # End time for job query
# Workload filters
backup_management_type: str # "AzureIaasVM", "AzureStorage", "AzureWorkload", "DPM", "MABS"
workload_type: str # "VM", "FileShare", "SQLDataBase", "SAPHanaDatabase"
# Entity filters
backup_item_name: str # Name of the backed up item
backup_item_type: str # Type of the backed up item
# Additional filters
policy_name: str # Name of the backup policy
vault_name: str # Name of the vault (for cross-vault queries)Usage examples:
from datetime import datetime, timedelta
# Get failed jobs from last 24 hours
failed_jobs = client.backup_jobs.list(
"my-rg", "my-vault",
filter="status eq 'Failed' and startTime ge datetime'2024-01-15T00:00:00Z'"
)
# Get all VM backup jobs
vm_jobs = client.backup_jobs.list(
"my-rg", "my-vault",
filter="operation eq 'Backup' and backupManagementType eq 'AzureIaasVM'"
)
# Get jobs for specific protected item
item_jobs = client.backup_jobs.list(
"my-rg", "my-vault",
filter="backupItemName eq 'vm;iaasvmcontainerv2;vm-rg;my-vm'"
)
# Get restore jobs in progress
restore_jobs = client.backup_jobs.list(
"my-rg", "my-vault",
filter="operation eq 'Restore' and status eq 'InProgress'"
)Different job types provide specific extended information for detailed monitoring.
class AzureIasSVMJobExtendedInfo:
def __init__(
self,
task_list: Optional[List[AzureIaaSVMJobTaskDetails]] = None,
property_bag: Optional[Dict[str, str]] = None,
internal_property_bag: Optional[Dict[str, str]] = None,
progress_percentage: Optional[float] = None,
estimated_remaining_duration: Optional[str] = None,
dynamic_error_message: Optional[str] = None,
**kwargs
):
"""
Extended information for Azure IaaS VM jobs.
Parameters:
- task_list: List of job tasks and their status
- property_bag: Additional job properties
- internal_property_bag: Internal properties
- progress_percentage: Job completion percentage
- estimated_remaining_duration: Estimated time to completion
- dynamic_error_message: Dynamic error information
"""
task_list: Optional[List[AzureIaaSVMJobTaskDetails]]
property_bag: Optional[Dict[str, str]]
internal_property_bag: Optional[Dict[str, str]]
progress_percentage: Optional[float]
estimated_remaining_duration: Optional[str]
dynamic_error_message: Optional[str]
class AzureIaaSVMJobTaskDetails:
def __init__(
self,
task_id: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
instance_id: Optional[str] = None,
duration: Optional[str] = None,
status: Optional[str] = None,
progress_percentage: Optional[float] = None,
task_execution_details: Optional[str] = None,
**kwargs
):
"""
Task details for Azure IaaS VM jobs.
Parameters:
- task_id: Unique task identifier
- start_time: Task start time
- end_time: Task end time (if completed)
- instance_id: Instance identifier
- duration: Task execution duration
- status: Task status
- progress_percentage: Task completion percentage
- task_execution_details: Detailed task execution information
"""
task_id: Optional[str]
start_time: Optional[datetime]
end_time: Optional[datetime]
instance_id: Optional[str]
duration: Optional[str]
status: Optional[str]
progress_percentage: Optional[float]
task_execution_details: Optional[str]Comprehensive error information for failed jobs across different workload types.
class AzureIaaSVMErrorInfo:
def __init__(
self,
error_code: Optional[int] = None,
error_string: Optional[str] = None,
error_title: Optional[str] = None,
recommendations: Optional[List[str]] = None,
**kwargs
):
"""
Error information for Azure IaaS VM jobs.
Parameters:
- error_code: Numeric error code
- error_string: Error description
- error_title: Error title/summary
- recommendations: List of recommended actions
"""
error_code: Optional[int]
error_string: Optional[str]
error_title: Optional[str]
recommendations: Optional[List[str]]
class AzureStorageErrorInfo:
def __init__(
self,
error_code: Optional[int] = None,
error_string: Optional[str] = None,
**kwargs
):
"""Error information for Azure Storage jobs."""
error_code: Optional[int]
error_string: Optional[str]
class AzureWorkloadErrorInfo:
def __init__(
self,
error_code: Optional[int] = None,
error_string: Optional[str] = None,
additional_details: Optional[str] = None,
**kwargs
):
"""Error information for Azure Workload jobs."""
error_code: Optional[int]
error_string: Optional[str]
additional_details: Optional[str]import time
def monitor_job_progress(resource_group_name, vault_name, job_name):
"""Monitor job progress until completion."""
while True:
job = client.job_details.get(resource_group_name, vault_name, job_name)
status = job.properties.status
print(f"Job Status: {status}")
# Check for extended info with progress
if hasattr(job.properties, 'extended_info') and job.properties.extended_info:
ext_info = job.properties.extended_info
if hasattr(ext_info, 'progress_percentage') and ext_info.progress_percentage:
print(f"Progress: {ext_info.progress_percentage}%")
if hasattr(ext_info, 'estimated_remaining_duration') and ext_info.estimated_remaining_duration:
print(f"Estimated remaining: {ext_info.estimated_remaining_duration}")
# Show task details
if hasattr(ext_info, 'task_list') and ext_info.task_list:
for task in ext_info.task_list:
print(f" Task {task.task_id}: {task.status}")
if task.progress_percentage:
print(f" Progress: {task.progress_percentage}%")
# Check if job is complete
if status in ["Completed", "Failed", "CompletedWithWarnings", "Cancelled"]:
print(f"Job finished with status: {status}")
# Show error details if failed
if status == "Failed" and job.properties.error_details:
for error in job.properties.error_details:
print(f"Error: {error.error_code} - {error.error_string}")
if hasattr(error, 'recommendations') and error.recommendations:
print("Recommendations:")
for rec in error.recommendations:
print(f" - {rec}")
break
time.sleep(30) # Wait 30 seconds before checking again
# Monitor a specific job
monitor_job_progress("my-rg", "my-vault", "backup-job-id-12345")from azure.mgmt.recoveryservicesbackup.activestamp.models import ExportJobsRequest
# Export job history for the last 30 days
export_request = ExportJobsRequest(
start_time=datetime.utcnow() - timedelta(days=30),
end_time=datetime.utcnow(),
job_operation_type="All", # "Backup", "Restore", "All"
job_status_type="All", # "InProgress", "Completed", "Failed", "All"
job_backup_management_type="All" # "AzureIaasVM", "AzureStorage", "All"
)
# Trigger export operation
client.jobs.export(
resource_group_name="my-rg",
vault_name="my-vault",
parameters=export_request
)
print("Job export initiated - check export operation results")# Get all failed jobs from the last week
failed_jobs = client.backup_jobs.list(
"my-rg", "my-vault",
filter=f"status eq 'Failed' and startTime ge datetime'{(datetime.utcnow() - timedelta(days=7)).isoformat()}Z'"
)
print("=== Failed Jobs Analysis ===")
error_summary = {}
for job in failed_jobs:
print(f"\nJob: {job.name}")
print(f"Operation: {job.properties.operation}")
print(f"Entity: {job.properties.entity_friendly_name}")
print(f"Start Time: {job.properties.start_time}")
if job.properties.error_details:
for error in job.properties.error_details:
error_code = error.error_code
error_summary[error_code] = error_summary.get(error_code, 0) + 1
print(f"Error: {error_code} - {error.error_string}")
# Show recommendations if available
if hasattr(error, 'recommendations') and error.recommendations:
print("Recommendations:")
for rec in error.recommendations:
print(f" - {rec}")
print(f"\n=== Error Summary ===")
for error_code, count in error_summary.items():
print(f"Error {error_code}: {count} occurrences")# Get all running jobs
running_jobs = client.backup_jobs.list(
"my-rg", "my-vault",
filter="status eq 'InProgress'"
)
print("Running jobs:")
for job in running_jobs:
print(f"- {job.name}: {job.properties.operation} on {job.properties.entity_friendly_name}")
# Cancel if it's a restore job that's been running too long
if (job.properties.operation == "Restore" and
job.properties.start_time < datetime.utcnow() - timedelta(hours=4)):
print(f" Cancelling long-running restore job: {job.name}")
client.job_cancellations.trigger("my-rg", "my-vault", job.name)Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-recoveryservicesbackup