Microsoft Azure Recovery Services Client Library for Python providing comprehensive APIs for managing Recovery Services vaults, certificates, private endpoints, and usage monitoring.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core service operations for Azure Recovery Services including name availability checking, service capability queries, identity management, and API operation discovery. These operations provide foundational functionality for service discovery, resource naming validation, and system administration.
Validates whether a proposed resource name is available for use within the specified location and resource group.
def check_name_availability(
resource_group_name: str,
location: str,
input: Union[CheckNameAvailabilityParameters, IO[bytes]],
**kwargs
) -> CheckNameAvailabilityResult:
"""
API to check for resource name availability. A name is available if no other resource exists
with the same SubscriptionId, Resource Name and Type, or if existing resources have been
garbage collected and deleted more than 24 hours ago.
Parameters:
- resource_group_name: str - The name of the resource group
- location: str - Location of the resource
- input: Union[CheckNameAvailabilityParameters, IO[bytes]] - Check name availability input
Returns:
CheckNameAvailabilityResult: Result indicating name availability status
"""Usage Example:
from azure.mgmt.recoveryservices.models import CheckNameAvailabilityParameters
# Check if vault name is available
name_check = CheckNameAvailabilityParameters(
type="Microsoft.RecoveryServices/vaults",
name="my-proposed-vault-name"
)
result = client.recovery_services.check_name_availability(
resource_group_name="my-rg",
location="eastus",
input=name_check
)
if result.name_available:
print(f"✅ Name '{name_check.name}' is available")
else:
print(f"❌ Name '{name_check.name}' is not available")
print(f"Reason: {result.reason}")
print(f"Message: {result.message}")Retrieves information about the capabilities and features provided by the Recovery Services resource provider in a specific location.
def capabilities(
location: str,
input: Union[ResourceCapabilities, IO[bytes]],
**kwargs
) -> CapabilitiesResponse:
"""
API to get details about capabilities provided by Microsoft.RecoveryServices RP.
Parameters:
- location: str - Location of the resource
- input: Union[ResourceCapabilities, IO[bytes]] - Resource capabilities input
Returns:
CapabilitiesResponse: Response containing service capabilities information
"""Usage Example:
from azure.mgmt.recoveryservices.models import ResourceCapabilities
# Query service capabilities
capabilities_request = ResourceCapabilities(
type="Microsoft.RecoveryServices/vaults"
)
capabilities = client.recovery_services.capabilities(
location="eastus",
input=capabilities_request
)
print(f"Capabilities for location: eastus")
if capabilities.properties:
print(f"DNS Zones: {capabilities.properties.dns_zones}")
print(f"DNS Zone Response: {capabilities.properties.dns_zone_response}")Removes a registered identity (container) from the Recovery Services vault, effectively unregistering it from backup or recovery operations.
def delete(resource_group_name: str, vault_name: str, identity_name: str, **kwargs) -> None:
"""
Unregisters the given container from your Recovery Services vault.
Parameters:
- resource_group_name: str - The name of the resource group
- vault_name: str - The name of the recovery services vault
- identity_name: str - Name of the protection container to unregister
Returns:
None
"""Usage Example:
# Unregister a protection container
try:
client.registered_identities.delete(
resource_group_name="my-rg",
vault_name="my-vault",
identity_name="my-container-identity"
)
print("✅ Successfully unregistered identity")
except Exception as e:
print(f"❌ Failed to unregister identity: {e}")Discovers all available operations provided by the Recovery Services resource provider.
def list(**kwargs) -> ItemPaged[ClientDiscoveryValueForSingleApi]:
"""
Returns the list of available operations.
Returns:
ItemPaged[ClientDiscoveryValueForSingleApi]: An iterator of available operations
"""Usage Example:
# List all available operations
operations = client.operations.list()
print("Available Recovery Services Operations:")
print("=" * 50)
for operation in operations:
print(f"Operation: {operation.name}")
if operation.display:
print(f" Provider: {operation.display.provider}")
print(f" Resource: {operation.display.resource}")
print(f" Operation: {operation.display.operation}")
print(f" Description: {operation.display.description}")
print()class CheckNameAvailabilityParameters:
"""
Parameters to check name availability.
Parameters:
- type: Optional[str] - Describes the Resource type: Microsoft.RecoveryServices/vaults
- name: Optional[str] - Resource name for which availability needs to be checked
"""
class CheckNameAvailabilityResult:
"""
Response for check name availability API.
Parameters:
- name_available: Optional[bool] - Gets a boolean value that indicates whether the name is available
- reason: Optional[str] - Gets the reason that a resource name could not be used
- message: Optional[str] - Gets an error message explaining the Reason value in more detail
"""class ResourceCapabilities:
"""
Input to get capabilities.
Parameters:
- type: Optional[str] - Describes the Resource type: Microsoft.RecoveryServices/vaults
"""
class CapabilitiesResponse:
"""
Response for get capabilities API.
Parameters:
- type: Optional[str] - Describes the Resource type: Microsoft.RecoveryServices/vaults
- properties: Optional[CapabilitiesResponseProperties] - Properties of the response
"""
class CapabilitiesResponseProperties:
"""
Class to define the capabilities response properties.
Parameters:
- dns_zones: Optional[List[DNSZone]] - The list of available DNS zones
- dns_zone_response: Optional[List[DNSZoneResponse]] - Capabilities information
"""class ClientDiscoveryValueForSingleApi:
"""
Available operation details.
Parameters:
- name: Optional[str] - Name of the operation
- display: Optional[ClientDiscoveryDisplay] - Display information of the operation
- origin: Optional[str] - The intended executor of the operation
- properties: Optional[ClientDiscoveryForProperties] - Properties of the operation
"""
class ClientDiscoveryDisplay:
"""
Localized display information of an operation.
Parameters:
- provider: Optional[str] - Name of the provider for display purposes
- resource: Optional[str] - ResourceType for which this operation is performed
- operation: Optional[str] - Localized friendly name of the operation
- description: Optional[str] - Localized friendly description of the operation
"""
class ClientDiscoveryForProperties:
"""
Class to define the properties of a client discovery operation.
Parameters:
- service_specification: Optional[ClientDiscoveryForServiceSpecification] - Operation properties
"""class DNSZone:
"""
A DNS zone.
Parameters:
- name: Optional[str] - Name of the DNS zone
- required_zone_names: Optional[List[str]] - Required DNS zone names
"""
class DNSZoneResponse:
"""
Response data for DNS zone.
Parameters:
- name: Optional[str] - Name of the DNS zone
- required_zone_names: Optional[List[str]] - The DNS zone names required for the operation
"""def validate_and_suggest_vault_name(client, resource_group: str, location: str, preferred_name: str, max_attempts: int = 5):
"""
Validate a preferred vault name and suggest alternatives if not available.
"""
def check_name(name: str) -> CheckNameAvailabilityResult:
return client.recovery_services.check_name_availability(
resource_group_name=resource_group,
location=location,
input=CheckNameAvailabilityParameters(
type="Microsoft.RecoveryServices/vaults",
name=name
)
)
# Check preferred name first
print(f"🔍 Checking availability of preferred name: '{preferred_name}'")
result = check_name(preferred_name)
if result.name_available:
print(f"✅ '{preferred_name}' is available!")
return preferred_name
print(f"❌ '{preferred_name}' is not available")
print(f" Reason: {result.reason}")
print(f" Message: {result.message}")
# Generate alternative names
alternatives = []
base_name = preferred_name.rstrip('0123456789-')
for i in range(1, max_attempts + 1):
alternative = f"{base_name}-{i:02d}"
alternatives.append(alternative)
# Check alternatives
print(f"\n🔍 Checking {len(alternatives)} alternative names...")
for alt_name in alternatives:
result = check_name(alt_name)
if result.name_available:
print(f"✅ Alternative name available: '{alt_name}'")
return alt_name
else:
print(f"❌ '{alt_name}' - {result.reason}")
print(f"\n⚠️ No available names found after {max_attempts} attempts")
return None
def check_regional_capabilities(client, locations: list):
"""
Check Recovery Services capabilities across multiple Azure regions.
"""
capabilities_by_region = {}
print("🌍 Checking Recovery Services capabilities across regions...")
print("=" * 60)
for location in locations:
try:
capabilities = client.recovery_services.capabilities(
location=location,
input=ResourceCapabilities(type="Microsoft.RecoveryServices/vaults")
)
capabilities_info = {
"location": location,
"available": True,
"dns_zones": [],
"features": []
}
if capabilities.properties:
if capabilities.properties.dns_zones:
capabilities_info["dns_zones"] = [
zone.name for zone in capabilities.properties.dns_zones if zone.name
]
if capabilities.properties.dns_zone_response:
capabilities_info["dns_zone_responses"] = [
zone.name for zone in capabilities.properties.dns_zone_response if zone.name
]
capabilities_by_region[location] = capabilities_info
print(f"✅ {location}:")
if capabilities_info["dns_zones"]:
print(f" DNS Zones: {', '.join(capabilities_info['dns_zones'])}")
else:
print(" DNS Zones: Not specified")
except Exception as e:
capabilities_by_region[location] = {
"location": location,
"available": False,
"error": str(e)
}
print(f"❌ {location}: Error - {e}")
return capabilities_by_regiondef discover_recovery_services_operations(client):
"""
Discover and document all available Recovery Services operations.
"""
print("🔍 Discovering Recovery Services API Operations...")
print("=" * 60)
operations = client.operations.list()
operations_by_category = {}
for operation in operations:
if not operation.display:
continue
category = operation.display.resource or "General"
if category not in operations_by_category:
operations_by_category[category] = []
op_info = {
"name": operation.name,
"operation": operation.display.operation,
"description": operation.display.description,
"provider": operation.display.provider,
"origin": operation.origin
}
operations_by_category[category].append(op_info)
# Display results organized by category
for category, ops in operations_by_category.items():
print(f"\n📋 {category} Operations:")
print("-" * 40)
for op in ops:
print(f" • {op['operation']}")
print(f" Name: {op['name']}")
if op['description']:
print(f" Description: {op['description']}")
print()
print(f"📊 Total operations discovered: {sum(len(ops) for ops in operations_by_category.values())}")
print(f"📊 Categories: {len(operations_by_category)}")
return operations_by_category
def validate_service_health(client, resource_group: str, location: str):
"""
Perform comprehensive service health validation.
"""
print("🏥 Recovery Services Health Check")
print("=" * 40)
health_status = {
"overall": "healthy",
"checks": [],
"warnings": [],
"errors": []
}
try:
# Test 1: Check if we can list operations
print("1️⃣ Testing operation discovery...")
operations = list(client.operations.list())
if operations:
health_status["checks"].append("✅ Operation discovery working")
print(f" Found {len(operations)} available operations")
else:
health_status["warnings"].append("⚠️ No operations discovered")
print(" ⚠️ No operations found")
# Test 2: Check name availability API
print("\n2️⃣ Testing name availability API...")
test_name = f"health-check-test-{int(datetime.now().timestamp())}"
name_result = client.recovery_services.check_name_availability(
resource_group_name=resource_group,
location=location,
input=CheckNameAvailabilityParameters(
type="Microsoft.RecoveryServices/vaults",
name=test_name
)
)
if name_result.name_available is not None:
health_status["checks"].append("✅ Name availability API working")
print(f" Test name '{test_name}' availability: {name_result.name_available}")
else:
health_status["warnings"].append("⚠️ Name availability API returned no result")
print(" ⚠️ Name availability check returned no result")
# Test 3: Check capabilities API
print("\n3️⃣ Testing capabilities API...")
capabilities = client.recovery_services.capabilities(
location=location,
input=ResourceCapabilities(type="Microsoft.RecoveryServices/vaults")
)
if capabilities:
health_status["checks"].append("✅ Capabilities API working")
print(" ✅ Capabilities API responding")
if capabilities.properties and capabilities.properties.dns_zones:
print(f" Found {len(capabilities.properties.dns_zones)} DNS zone configurations")
else:
health_status["warnings"].append("⚠️ Capabilities API returned no data")
print(" ⚠️ Capabilities API returned no data")
except Exception as e:
health_status["errors"].append(f"❌ Service health check failed: {e}")
health_status["overall"] = "unhealthy"
print(f"\n❌ Health check failed: {e}")
# Determine overall status
if health_status["errors"]:
health_status["overall"] = "unhealthy"
elif health_status["warnings"]:
health_status["overall"] = "degraded"
print(f"\n📊 Overall Service Health: {health_status['overall'].upper()}")
return health_statusdef list_registered_identities_from_vault(vault_properties):
"""
Extract registered identity information from vault properties.
Note: This analyzes vault properties since there's no direct list API for registered identities.
"""
if not vault_properties:
print("No vault properties provided")
return []
identities = []
# Check for managed identity configuration
if hasattr(vault_properties, 'identity') and vault_properties.identity:
identity_info = {
"type": "managed_identity",
"identity_type": vault_properties.identity.type,
"principal_id": vault_properties.identity.principal_id,
"tenant_id": vault_properties.identity.tenant_id
}
if vault_properties.identity.user_assigned_identities:
identity_info["user_assigned_identities"] = list(vault_properties.identity.user_assigned_identities.keys())
identities.append(identity_info)
print(f"Found {len(identities)} identity configurations:")
for identity in identities:
print(f" Type: {identity['type']}")
if identity.get('identity_type'):
print(f" Identity Type: {identity['identity_type']}")
if identity.get('principal_id'):
print(f" Principal ID: {identity['principal_id']}")
if identity.get('user_assigned_identities'):
print(f" User Assigned Identities: {len(identity['user_assigned_identities'])}")
return identities
def safely_unregister_identity(client, resource_group: str, vault_name: str, identity_name: str, confirm: bool = False):
"""
Safely unregister an identity with confirmation and error handling.
"""
if not confirm:
print("⚠️ WARNING: This will unregister the identity from the vault.")
print(" This action may affect backup and recovery operations.")
print(" Set confirm=True to proceed.")
return False
try:
print(f"🔄 Unregistering identity: {identity_name}")
client.registered_identities.delete(
resource_group_name=resource_group,
vault_name=vault_name,
identity_name=identity_name
)
print(f"✅ Successfully unregistered identity: {identity_name}")
print(" Note: It may take a few minutes for the change to take effect.")
return True
except Exception as e:
print(f"❌ Failed to unregister identity: {e}")
# Provide helpful error guidance
if "NotFound" in str(e):
print(" The identity may not exist or may already be unregistered.")
elif "Forbidden" in str(e):
print(" Check that you have sufficient permissions to manage vault identities.")
elif "Conflict" in str(e):
print(" The identity may be in use. Ensure no active backup/recovery operations depend on it.")
return FalseInstall with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-recoveryservices