Microsoft Azure Recovery Services Client Library for Python providing comprehensive APIs for managing Recovery Services vaults, certificates, private endpoints, and usage monitoring.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Monitoring and reporting capabilities for Recovery Services vault usage metrics, replication usage statistics, and capacity planning information. These operations provide insights into storage consumption, backup operations, and service utilization for cost management and capacity planning.
Retrieves usage metrics for a specific Recovery Services vault including storage consumption and backup item counts.
def list_by_vaults(resource_group_name: str, vault_name: str, **kwargs) -> ItemPaged[VaultUsage]:
"""
Fetches the usages of the vault.
Parameters:
- resource_group_name: str - The name of the resource group
- vault_name: str - The name of the recovery services vault
Returns:
ItemPaged[VaultUsage]: An iterator of vault usage information
"""Usage Example:
# Get vault usage information
usages = client.usages.list_by_vaults(
resource_group_name="my-rg",
vault_name="my-vault"
)
for usage in usages:
print(f"Usage Metric: {usage.name.localized_value}")
print(f" Current Value: {usage.current_value}")
print(f" Limit: {usage.limit}")
print(f" Unit: {usage.unit}")
# Calculate percentage used
if usage.limit and usage.limit > 0:
percentage = (usage.current_value / usage.limit) * 100
print(f" Usage: {percentage:.1f}%")
print()Retrieves replication usage statistics for site recovery operations within a vault.
def list(resource_group_name: str, vault_name: str, **kwargs) -> ItemPaged[ReplicationUsage]:
"""
Fetches the replication usages of the vault.
Parameters:
- resource_group_name: str - The name of the resource group
- vault_name: str - The name of the recovery services vault
Returns:
ItemPaged[ReplicationUsage]: An iterator of replication usage information
"""Usage Example:
# Get replication usage information
replication_usages = client.replication_usages.list(
resource_group_name="my-rg",
vault_name="my-vault"
)
for usage in replication_usages:
print(f"Replication Usage: {usage.monitor_summary.events_summary}")
print(f" Jobs Summary: {usage.jobs_summary}")
print(f" Monitoring Summary: {usage.monitor_summary}")
print()class VaultUsage:
"""
Usages of a vault.
Parameters:
- unit: Optional[UsagesUnit] - Unit of the usage (Count, Bytes, Seconds, Percent, CountPerSecond, BytesPerSecond)
- quota_period: Optional[str] - Quota period of usage
- next_reset_time: Optional[datetime] - Next reset time of usage
- name: Optional[NameInfo] - The name of the usage
- current_value: Optional[int] - Current value of usage
- limit: Optional[int] - Limit of usage
"""class VaultUsageList:
"""
Usage for vault.
Parameters:
- value: Optional[List[VaultUsage]] - The list of usages for the given vault
"""class ReplicationUsage:
"""
Replication usage of a vault.
Parameters:
- monitor_summary: Optional[MonitoringSummary] - Summary of the replication monitoring data for this vault
- jobs_summary: Optional[JobsSummary] - Summary of the replication jobs data for this vault
"""class ReplicationUsageList:
"""
Replication usages for vault.
Parameters:
- value: Optional[List[ReplicationUsage]] - The list of replication usages for the given vault
"""class NameInfo:
"""
The name of usage.
Parameters:
- value: Optional[str] - Value of usage
- localized_value: Optional[str] - Localized value of usage
"""class MonitoringSummary:
"""
Summary of the replication monitoring data for this vault.
Parameters:
- unhealthy_vm_count: Optional[int] - Count of unhealthy VMs
- unhealthy_provider_count: Optional[int] - Count of unhealthy replication providers
- events_summary: Optional[Dict[str, int]] - Summary of events
- deprecated_provider_count: Optional[int] - Count of deprecated providers
- supported_provider_count: Optional[int] - Count of supported providers
- unsupported_provider_count: Optional[int] - Count of unsupported providers
"""class JobsSummary:
"""
Summary of the replication jobs data for this vault.
Parameters:
- failed_jobs: Optional[int] - Count of failed jobs
- suspended_jobs: Optional[int] - Count of suspended jobs
- in_progress_jobs: Optional[int] - Count of in-progress jobs
"""class UsagesUnit(str, Enum):
"""
Unit of the usage.
"""
COUNT = "Count"
BYTES = "Bytes"
SECONDS = "Seconds"
PERCENT = "Percent"
COUNT_PER_SECOND = "CountPerSecond"
BYTES_PER_SECOND = "BytesPerSecond"from datetime import datetime
def generate_vault_usage_report(client, resource_group: str, vault_name: str):
"""
Generate a comprehensive usage report for a Recovery Services vault.
"""
print(f"=== Usage Report for Vault: {vault_name} ===")
print(f"Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Resource Group: {resource_group}")
print()
try:
# Get vault usage information
print("📊 VAULT USAGE METRICS")
print("-" * 50)
usages = client.usages.list_by_vaults(resource_group, vault_name)
usage_found = False
for usage in usages:
usage_found = True
print(f"Metric: {usage.name.localized_value}")
print(f" Current: {usage.current_value:,}")
print(f" Limit: {usage.limit:,}" if usage.limit else " Limit: Unlimited")
print(f" Unit: {usage.unit}")
# Calculate and display percentage if limit exists
if usage.limit and usage.limit > 0:
percentage = (usage.current_value / usage.limit) * 100
print(f" Usage: {percentage:.1f}%")
# Add warning for high usage
if percentage > 80:
print(" ⚠️ WARNING: High usage detected!")
elif percentage > 95:
print(" 🚨 CRITICAL: Near limit!")
# Show next reset time if available
if usage.next_reset_time:
print(f" Next Reset: {usage.next_reset_time}")
print()
if not usage_found:
print("No usage metrics available")
# Get replication usage information
print("🔄 REPLICATION USAGE METRICS")
print("-" * 50)
replication_usages = client.replication_usages.list(resource_group, vault_name)
replication_found = False
for repl_usage in replication_usages:
replication_found = True
# Jobs summary
if repl_usage.jobs_summary:
jobs = repl_usage.jobs_summary
print("Job Statistics:")
print(f" ✅ In Progress: {jobs.in_progress_jobs or 0}")
print(f" ❌ Failed: {jobs.failed_jobs or 0}")
print(f" ⏸️ Suspended: {jobs.suspended_jobs or 0}")
# Calculate total jobs
total_jobs = (jobs.in_progress_jobs or 0) + (jobs.failed_jobs or 0) + (jobs.suspended_jobs or 0)
print(f" 📊 Total: {total_jobs}")
if jobs.failed_jobs and jobs.failed_jobs > 0:
print(" ⚠️ WARNING: Failed jobs detected!")
print()
# Monitoring summary
if repl_usage.monitor_summary:
monitor = repl_usage.monitor_summary
print("Health Statistics:")
print(f" 🏥 Unhealthy VMs: {monitor.unhealthy_vm_count or 0}")
print(f" 🔧 Unhealthy Providers: {monitor.unhealthy_provider_count or 0}")
print(f" ✅ Supported Providers: {monitor.supported_provider_count or 0}")
print(f" ❌ Unsupported Providers: {monitor.unsupported_provider_count or 0}")
print(f" 📅 Deprecated Providers: {monitor.deprecated_provider_count or 0}")
if monitor.unhealthy_vm_count and monitor.unhealthy_vm_count > 0:
print(" ⚠️ WARNING: Unhealthy VMs detected!")
# Events summary
if monitor.events_summary:
print("\nEvent Summary:")
for event_type, count in monitor.events_summary.items():
print(f" {event_type}: {count}")
print()
if not replication_found:
print("No replication usage metrics available")
except Exception as e:
print(f"Error generating usage report: {e}")
raisedef monitor_vault_usage_with_alerts(client, resource_group: str, vault_name: str, alert_threshold: float = 80.0):
"""
Monitor vault usage and generate alerts when thresholds are exceeded.
"""
alerts = []
try:
# Monitor vault usage
usages = client.usages.list_by_vaults(resource_group, vault_name)
for usage in usages:
if usage.limit and usage.limit > 0:
percentage = (usage.current_value / usage.limit) * 100
if percentage >= alert_threshold:
alert = {
"vault": vault_name,
"metric": usage.name.localized_value,
"current_value": usage.current_value,
"limit": usage.limit,
"percentage": percentage,
"unit": usage.unit,
"severity": "critical" if percentage >= 95 else "warning"
}
alerts.append(alert)
# Monitor replication health
replication_usages = client.replication_usages.list(resource_group, vault_name)
for repl_usage in replication_usages:
# Check for failed jobs
if repl_usage.jobs_summary and repl_usage.jobs_summary.failed_jobs:
if repl_usage.jobs_summary.failed_jobs > 0:
alert = {
"vault": vault_name,
"metric": "Failed Replication Jobs",
"current_value": repl_usage.jobs_summary.failed_jobs,
"severity": "warning",
"message": f"{repl_usage.jobs_summary.failed_jobs} replication jobs have failed"
}
alerts.append(alert)
# Check for unhealthy VMs
if repl_usage.monitor_summary and repl_usage.monitor_summary.unhealthy_vm_count:
if repl_usage.monitor_summary.unhealthy_vm_count > 0:
alert = {
"vault": vault_name,
"metric": "Unhealthy VMs",
"current_value": repl_usage.monitor_summary.unhealthy_vm_count,
"severity": "warning",
"message": f"{repl_usage.monitor_summary.unhealthy_vm_count} VMs are in unhealthy state"
}
alerts.append(alert)
return alerts
except Exception as e:
print(f"Error monitoring vault usage: {e}")
raise
def process_usage_alerts(alerts):
"""Process and display usage alerts."""
if not alerts:
print("✅ No usage alerts - all metrics within normal range")
return
print(f"🚨 {len(alerts)} usage alert(s) detected:")
print()
critical_alerts = [a for a in alerts if a.get("severity") == "critical"]
warning_alerts = [a for a in alerts if a.get("severity") == "warning"]
if critical_alerts:
print("🚨 CRITICAL ALERTS:")
for alert in critical_alerts:
print(f" Vault: {alert['vault']}")
print(f" Metric: {alert['metric']}")
if 'percentage' in alert:
print(f" Usage: {alert['percentage']:.1f}% ({alert['current_value']:,}/{alert['limit']:,} {alert['unit']})")
else:
print(f" Value: {alert['current_value']}")
if 'message' in alert:
print(f" Details: {alert['message']}")
print()
if warning_alerts:
print("⚠️ WARNING ALERTS:")
for alert in warning_alerts:
print(f" Vault: {alert['vault']}")
print(f" Metric: {alert['metric']}")
if 'percentage' in alert:
print(f" Usage: {alert['percentage']:.1f}% ({alert['current_value']:,}/{alert['limit']:,} {alert['unit']})")
else:
print(f" Value: {alert['current_value']}")
if 'message' in alert:
print(f" Details: {alert['message']}")
print()import json
from pathlib import Path
from datetime import datetime
def track_usage_history(client, resource_group: str, vault_name: str, history_file: str = "vault_usage_history.json"):
"""
Track vault usage over time by appending current metrics to a history file.
"""
timestamp = datetime.now().isoformat()
# Collect current usage data
usage_data = {
"timestamp": timestamp,
"vault_name": vault_name,
"resource_group": resource_group,
"vault_usage": [],
"replication_usage": []
}
try:
# Get vault usage
usages = client.usages.list_by_vaults(resource_group, vault_name)
for usage in usages:
usage_record = {
"metric": usage.name.localized_value,
"current_value": usage.current_value,
"limit": usage.limit,
"unit": usage.unit,
"percentage": (usage.current_value / usage.limit * 100) if usage.limit and usage.limit > 0 else None
}
usage_data["vault_usage"].append(usage_record)
# Get replication usage
replication_usages = client.replication_usages.list(resource_group, vault_name)
for repl_usage in replication_usages:
repl_record = {}
if repl_usage.jobs_summary:
repl_record["jobs"] = {
"in_progress": repl_usage.jobs_summary.in_progress_jobs,
"failed": repl_usage.jobs_summary.failed_jobs,
"suspended": repl_usage.jobs_summary.suspended_jobs
}
if repl_usage.monitor_summary:
repl_record["monitoring"] = {
"unhealthy_vms": repl_usage.monitor_summary.unhealthy_vm_count,
"unhealthy_providers": repl_usage.monitor_summary.unhealthy_provider_count,
"supported_providers": repl_usage.monitor_summary.supported_provider_count
}
if repl_record:
usage_data["replication_usage"].append(repl_record)
# Load existing history
history = []
history_path = Path(history_file)
if history_path.exists():
with open(history_path, 'r') as f:
history = json.load(f)
# Append new data
history.append(usage_data)
# Keep only last 30 days of data (assuming daily collection)
if len(history) > 30:
history = history[-30:]
# Save updated history
with open(history_path, 'w') as f:
json.dump(history, f, indent=2, default=str)
print(f"Usage data saved to {history_file}")
return usage_data
except Exception as e:
print(f"Error tracking usage history: {e}")
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-recoveryservices