CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-mgmt-recoveryservices

Microsoft Azure Recovery Services Client Library for Python providing comprehensive APIs for managing Recovery Services vaults, certificates, private endpoints, and usage monitoring.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

usage-monitoring.mddocs/

Usage Monitoring

Monitoring and reporting capabilities for Recovery Services vault usage metrics, replication usage statistics, and capacity planning information. These operations provide insights into storage consumption, backup operations, and service utilization for cost management and capacity planning.

Capabilities

Get Vault Usage Information

Retrieves usage metrics for a specific Recovery Services vault including storage consumption and backup item counts.

def list_by_vaults(resource_group_name: str, vault_name: str, **kwargs) -> ItemPaged[VaultUsage]:
    """
    Fetches the usages of the vault.
    
    Parameters:
    - resource_group_name: str - The name of the resource group
    - vault_name: str - The name of the recovery services vault
    
    Returns:
    ItemPaged[VaultUsage]: An iterator of vault usage information
    """

Usage Example:

# Get vault usage information
usages = client.usages.list_by_vaults(
    resource_group_name="my-rg", 
    vault_name="my-vault"
)

for usage in usages:
    print(f"Usage Metric: {usage.name.localized_value}")
    print(f"  Current Value: {usage.current_value}")
    print(f"  Limit: {usage.limit}")
    print(f"  Unit: {usage.unit}")
    
    # Calculate percentage used
    if usage.limit and usage.limit > 0:
        percentage = (usage.current_value / usage.limit) * 100
        print(f"  Usage: {percentage:.1f}%")
    print()

Get Replication Usage Information

Retrieves replication usage statistics for site recovery operations within a vault.

def list(resource_group_name: str, vault_name: str, **kwargs) -> ItemPaged[ReplicationUsage]:
    """
    Fetches the replication usages of the vault.
    
    Parameters:
    - resource_group_name: str - The name of the resource group  
    - vault_name: str - The name of the recovery services vault
    
    Returns:
    ItemPaged[ReplicationUsage]: An iterator of replication usage information
    """

Usage Example:

# Get replication usage information
replication_usages = client.replication_usages.list(
    resource_group_name="my-rg",
    vault_name="my-vault"
)

for usage in replication_usages:
    print(f"Replication Usage: {usage.monitor_summary.events_summary}")
    print(f"  Jobs Summary: {usage.jobs_summary}")
    print(f"  Monitoring Summary: {usage.monitor_summary}")
    print()

Usage Types

Vault Usage

class VaultUsage:
    """
    Usages of a vault.
    
    Parameters:
    - unit: Optional[UsagesUnit] - Unit of the usage (Count, Bytes, Seconds, Percent, CountPerSecond, BytesPerSecond)
    - quota_period: Optional[str] - Quota period of usage
    - next_reset_time: Optional[datetime] - Next reset time of usage
    - name: Optional[NameInfo] - The name of the usage
    - current_value: Optional[int] - Current value of usage
    - limit: Optional[int] - Limit of usage
    """

Vault Usage List

class VaultUsageList:
    """
    Usage for vault.
    
    Parameters:
    - value: Optional[List[VaultUsage]] - The list of usages for the given vault
    """

Replication Usage

class ReplicationUsage:
    """
    Replication usage of a vault.
    
    Parameters:
    - monitor_summary: Optional[MonitoringSummary] - Summary of the replication monitoring data for this vault
    - jobs_summary: Optional[JobsSummary] - Summary of the replication jobs data for this vault
    """

Replication Usage List

class ReplicationUsageList:
    """
    Replication usages for vault.
    
    Parameters:
    - value: Optional[List[ReplicationUsage]] - The list of replication usages for the given vault
    """

Name Info

class NameInfo:
    """
    The name of usage.
    
    Parameters:
    - value: Optional[str] - Value of usage
    - localized_value: Optional[str] - Localized value of usage
    """

Monitoring Summary

class MonitoringSummary:
    """
    Summary of the replication monitoring data for this vault.
    
    Parameters:
    - unhealthy_vm_count: Optional[int] - Count of unhealthy VMs
    - unhealthy_provider_count: Optional[int] - Count of unhealthy replication providers
    - events_summary: Optional[Dict[str, int]] - Summary of events
    - deprecated_provider_count: Optional[int] - Count of deprecated providers
    - supported_provider_count: Optional[int] - Count of supported providers
    - unsupported_provider_count: Optional[int] - Count of unsupported providers
    """

Jobs Summary

class JobsSummary:
    """
    Summary of the replication jobs data for this vault.
    
    Parameters:
    - failed_jobs: Optional[int] - Count of failed jobs
    - suspended_jobs: Optional[int] - Count of suspended jobs
    - in_progress_jobs: Optional[int] - Count of in-progress jobs
    """

Usage Units

class UsagesUnit(str, Enum):
    """
    Unit of the usage.
    """
    COUNT = "Count"
    BYTES = "Bytes"
    SECONDS = "Seconds"
    PERCENT = "Percent"
    COUNT_PER_SECOND = "CountPerSecond"
    BYTES_PER_SECOND = "BytesPerSecond"

Usage Patterns

Comprehensive Vault Usage Report

from datetime import datetime

def generate_vault_usage_report(client, resource_group: str, vault_name: str):
    """
    Generate a comprehensive usage report for a Recovery Services vault.
    """
    
    print(f"=== Usage Report for Vault: {vault_name} ===")
    print(f"Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Resource Group: {resource_group}")
    print()
    
    try:
        # Get vault usage information
        print("📊 VAULT USAGE METRICS")
        print("-" * 50)
        
        usages = client.usages.list_by_vaults(resource_group, vault_name)
        usage_found = False
        
        for usage in usages:
            usage_found = True
            print(f"Metric: {usage.name.localized_value}")
            print(f"  Current: {usage.current_value:,}")
            print(f"  Limit: {usage.limit:,}" if usage.limit else "  Limit: Unlimited")
            print(f"  Unit: {usage.unit}")
            
            # Calculate and display percentage if limit exists
            if usage.limit and usage.limit > 0:
                percentage = (usage.current_value / usage.limit) * 100
                print(f"  Usage: {percentage:.1f}%")
                
                # Add warning for high usage
                if percentage > 80:
                    print("  ⚠️  WARNING: High usage detected!")
                elif percentage > 95:
                    print("  🚨 CRITICAL: Near limit!")
            
            # Show next reset time if available
            if usage.next_reset_time:
                print(f"  Next Reset: {usage.next_reset_time}")
            
            print()
        
        if not usage_found:
            print("No usage metrics available")
        
        # Get replication usage information
        print("🔄 REPLICATION USAGE METRICS")
        print("-" * 50)
        
        replication_usages = client.replication_usages.list(resource_group, vault_name)
        replication_found = False
        
        for repl_usage in replication_usages:
            replication_found = True
            
            # Jobs summary
            if repl_usage.jobs_summary:
                jobs = repl_usage.jobs_summary
                print("Job Statistics:")
                print(f"  ✅ In Progress: {jobs.in_progress_jobs or 0}")
                print(f"  ❌ Failed: {jobs.failed_jobs or 0}")
                print(f"  ⏸️  Suspended: {jobs.suspended_jobs or 0}")
                
                # Calculate total jobs
                total_jobs = (jobs.in_progress_jobs or 0) + (jobs.failed_jobs or 0) + (jobs.suspended_jobs or 0)
                print(f"  📊 Total: {total_jobs}")
                
                if jobs.failed_jobs and jobs.failed_jobs > 0:
                    print("  ⚠️  WARNING: Failed jobs detected!")
                print()
            
            # Monitoring summary  
            if repl_usage.monitor_summary:
                monitor = repl_usage.monitor_summary
                print("Health Statistics:")
                print(f"  🏥 Unhealthy VMs: {monitor.unhealthy_vm_count or 0}")
                print(f"  🔧 Unhealthy Providers: {monitor.unhealthy_provider_count or 0}")
                print(f"  ✅ Supported Providers: {monitor.supported_provider_count or 0}")
                print(f"  ❌ Unsupported Providers: {monitor.unsupported_provider_count or 0}")
                print(f"  📅 Deprecated Providers: {monitor.deprecated_provider_count or 0}")
                
                if monitor.unhealthy_vm_count and monitor.unhealthy_vm_count > 0:
                    print("  ⚠️  WARNING: Unhealthy VMs detected!")
                
                # Events summary
                if monitor.events_summary:
                    print("\nEvent Summary:")
                    for event_type, count in monitor.events_summary.items():
                        print(f"  {event_type}: {count}")
                print()
        
        if not replication_found:
            print("No replication usage metrics available")
            
    except Exception as e:
        print(f"Error generating usage report: {e}")
        raise

Usage Monitoring and Alerting

def monitor_vault_usage_with_alerts(client, resource_group: str, vault_name: str, alert_threshold: float = 80.0):
    """
    Monitor vault usage and generate alerts when thresholds are exceeded.
    """
    
    alerts = []
    
    try:
        # Monitor vault usage
        usages = client.usages.list_by_vaults(resource_group, vault_name)
        
        for usage in usages:
            if usage.limit and usage.limit > 0:
                percentage = (usage.current_value / usage.limit) * 100
                
                if percentage >= alert_threshold:
                    alert = {
                        "vault": vault_name,
                        "metric": usage.name.localized_value,
                        "current_value": usage.current_value,
                        "limit": usage.limit,
                        "percentage": percentage,
                        "unit": usage.unit,
                        "severity": "critical" if percentage >= 95 else "warning"
                    }
                    alerts.append(alert)
        
        # Monitor replication health
        replication_usages = client.replication_usages.list(resource_group, vault_name)
        
        for repl_usage in replication_usages:
            # Check for failed jobs
            if repl_usage.jobs_summary and repl_usage.jobs_summary.failed_jobs:
                if repl_usage.jobs_summary.failed_jobs > 0:
                    alert = {
                        "vault": vault_name,
                        "metric": "Failed Replication Jobs",
                        "current_value": repl_usage.jobs_summary.failed_jobs,
                        "severity": "warning",
                        "message": f"{repl_usage.jobs_summary.failed_jobs} replication jobs have failed"
                    }
                    alerts.append(alert)
            
            # Check for unhealthy VMs
            if repl_usage.monitor_summary and repl_usage.monitor_summary.unhealthy_vm_count:
                if repl_usage.monitor_summary.unhealthy_vm_count > 0:
                    alert = {
                        "vault": vault_name,
                        "metric": "Unhealthy VMs",
                        "current_value": repl_usage.monitor_summary.unhealthy_vm_count,
                        "severity": "warning",
                        "message": f"{repl_usage.monitor_summary.unhealthy_vm_count} VMs are in unhealthy state"
                    }
                    alerts.append(alert)
        
        return alerts
        
    except Exception as e:
        print(f"Error monitoring vault usage: {e}")
        raise

def process_usage_alerts(alerts):
    """Process and display usage alerts."""
    
    if not alerts:
        print("✅ No usage alerts - all metrics within normal range")
        return
    
    print(f"🚨 {len(alerts)} usage alert(s) detected:")
    print()
    
    critical_alerts = [a for a in alerts if a.get("severity") == "critical"]
    warning_alerts = [a for a in alerts if a.get("severity") == "warning"]
    
    if critical_alerts:
        print("🚨 CRITICAL ALERTS:")
        for alert in critical_alerts:
            print(f"  Vault: {alert['vault']}")
            print(f"  Metric: {alert['metric']}")
            if 'percentage' in alert:
                print(f"  Usage: {alert['percentage']:.1f}% ({alert['current_value']:,}/{alert['limit']:,} {alert['unit']})")
            else:
                print(f"  Value: {alert['current_value']}")
                if 'message' in alert:
                    print(f"  Details: {alert['message']}")
            print()
    
    if warning_alerts:
        print("⚠️  WARNING ALERTS:")
        for alert in warning_alerts:
            print(f"  Vault: {alert['vault']}")
            print(f"  Metric: {alert['metric']}")
            if 'percentage' in alert:
                print(f"  Usage: {alert['percentage']:.1f}% ({alert['current_value']:,}/{alert['limit']:,} {alert['unit']})")
            else:
                print(f"  Value: {alert['current_value']}")
                if 'message' in alert:
                    print(f"  Details: {alert['message']}")
            print()

Historical Usage Tracking

import json
from pathlib import Path
from datetime import datetime

def track_usage_history(client, resource_group: str, vault_name: str, history_file: str = "vault_usage_history.json"):
    """
    Track vault usage over time by appending current metrics to a history file.
    """
    
    timestamp = datetime.now().isoformat()
    
    # Collect current usage data
    usage_data = {
        "timestamp": timestamp,
        "vault_name": vault_name,
        "resource_group": resource_group,
        "vault_usage": [],
        "replication_usage": []
    }
    
    try:
        # Get vault usage
        usages = client.usages.list_by_vaults(resource_group, vault_name)
        for usage in usages:
            usage_record = {
                "metric": usage.name.localized_value,
                "current_value": usage.current_value,
                "limit": usage.limit,
                "unit": usage.unit,
                "percentage": (usage.current_value / usage.limit * 100) if usage.limit and usage.limit > 0 else None
            }
            usage_data["vault_usage"].append(usage_record)
        
        # Get replication usage
        replication_usages = client.replication_usages.list(resource_group, vault_name)
        for repl_usage in replication_usages:
            repl_record = {}
            
            if repl_usage.jobs_summary:
                repl_record["jobs"] = {
                    "in_progress": repl_usage.jobs_summary.in_progress_jobs,
                    "failed": repl_usage.jobs_summary.failed_jobs,
                    "suspended": repl_usage.jobs_summary.suspended_jobs
                }
            
            if repl_usage.monitor_summary:
                repl_record["monitoring"] = {
                    "unhealthy_vms": repl_usage.monitor_summary.unhealthy_vm_count,
                    "unhealthy_providers": repl_usage.monitor_summary.unhealthy_provider_count,
                    "supported_providers": repl_usage.monitor_summary.supported_provider_count
                }
            
            if repl_record:
                usage_data["replication_usage"].append(repl_record)
        
        # Load existing history
        history = []
        history_path = Path(history_file)
        if history_path.exists():
            with open(history_path, 'r') as f:
                history = json.load(f)
        
        # Append new data
        history.append(usage_data)
        
        # Keep only last 30 days of data (assuming daily collection)
        if len(history) > 30:
            history = history[-30:]
        
        # Save updated history
        with open(history_path, 'w') as f:
            json.dump(history, f, indent=2, default=str)
        
        print(f"Usage data saved to {history_file}")
        return usage_data
        
    except Exception as e:
        print(f"Error tracking usage history: {e}")
        raise

Install with Tessl CLI

npx tessl i tessl/pypi-azure-mgmt-recoveryservices

docs

certificate-management.md

extended-vault-info.md

index.md

private-link-resources.md

service-operations.md

usage-monitoring.md

vault-management.md

tile.json