Microsoft Azure Key Vault secrets client library for Python providing secure storage and management of sensitive information
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive error handling patterns and exception management for robust secret operations with proper authentication and network error handling. The Azure Key Vault Secrets library uses standard Azure Core exceptions for consistent error handling across Azure SDKs.
Standard Azure Core exceptions used throughout the Key Vault Secrets library.
# From azure.core.exceptions
class ResourceNotFoundError(HttpResponseError):
"""
Raised when a requested secret does not exist.
Common scenarios:
- Getting a secret that doesn't exist
- Updating a non-existent secret
- Operating on a deleted secret that has been purged
"""
class ResourceExistsError(HttpResponseError):
"""
Raised when attempting to create a resource that already exists.
Common scenarios:
- Restoring a secret when one with the same name already exists
- Conflicting operations during secret creation
"""
class ClientAuthenticationError(HttpResponseError):
"""
Raised when authentication with Azure Key Vault fails.
Common scenarios:
- Invalid or expired credentials
- Insufficient permissions for the operation
- Network issues preventing authentication
"""
class HttpResponseError(Exception):
"""
Base exception for HTTP response errors.
Properties:
- status_code (int): HTTP status code
- reason (str): HTTP reason phrase
- response (HttpResponse): Full HTTP response object
- message (str): Error message
"""
class ServiceRequestError(Exception):
"""
Raised when there's an error in the service request.
Common scenarios:
- Network connectivity issues
- DNS resolution problems
- Connection timeouts
"""Specific patterns and handling for authentication-related failures.
# Authentication error scenarios and handling
def handle_authentication_error(error: ClientAuthenticationError) -> None:
"""
Handle authentication errors with appropriate remediation.
Common causes:
- Expired tokens or certificates
- Insufficient Key Vault permissions
- Invalid credentials configuration
- Network firewall blocking authentication endpoints
"""Handling authorization failures and access policy issues.
# Permission error patterns (manifested as HttpResponseError with 403 status)
def handle_permission_error(error: HttpResponseError) -> None:
"""
Handle permission and authorization errors.
Common causes:
- Missing Key Vault access policies
- Insufficient RBAC roles
- Operations not allowed by access policy
- Vault access restricted by network rules
"""Handling errors related to secret state and lifecycle.
def handle_secret_state_error(error: HttpResponseError) -> None:
"""
Handle errors related to secret state and lifecycle.
Common scenarios:
- Operating on disabled secrets
- Accessing expired secrets
- Conflicts with secret deletion state
- Version-specific operation errors
"""from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import (
ResourceNotFoundError,
ClientAuthenticationError,
HttpResponseError
)
def safe_get_secret(client: SecretClient, name: str) -> Optional[str]:
"""Safely retrieve a secret with comprehensive error handling."""
try:
secret = client.get_secret(name)
return secret.value
except ResourceNotFoundError:
print(f"Secret '{name}' does not exist")
return None
except ClientAuthenticationError as e:
print(f"Authentication failed: {e}")
# Handle credential refresh or re-authentication
return None
except HttpResponseError as e:
if e.status_code == 403:
print(f"Access denied for secret '{name}': {e}")
elif e.status_code == 429:
print(f"Rate limited. Retry after: {e.response.headers.get('Retry-After', 'unknown')}")
else:
print(f"HTTP error {e.status_code}: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return Noneimport time
from typing import Optional, Callable, TypeVar
from azure.core.exceptions import HttpResponseError, ServiceRequestError
T = TypeVar('T')
def retry_operation(
operation: Callable[[], T],
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_multiplier: float = 2.0
) -> Optional[T]:
"""
Retry an operation with exponential backoff for transient failures.
"""
for attempt in range(max_retries):
try:
return operation()
except (ServiceRequestError, HttpResponseError) as e:
# Check if error is retryable
if isinstance(e, HttpResponseError):
# Don't retry client errors (4xx) except rate limiting (429)
if 400 <= e.status_code < 500 and e.status_code != 429:
raise
if attempt == max_retries - 1:
raise # Last attempt, re-raise the exception
delay = initial_delay * (backoff_multiplier ** attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
time.sleep(delay)
return None
# Usage example
def get_secret_with_retry(client: SecretClient, name: str) -> Optional[str]:
"""Get secret with automatic retry for transient failures."""
try:
operation = lambda: client.get_secret(name).value
return retry_operation(operation)
except Exception as e:
print(f"Failed to get secret after retries: {e}")
return Nonefrom azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import *
from typing import Optional, Dict, Any
import logging
class SecureSecretManager:
"""
Wrapper class providing robust secret management with comprehensive error handling.
"""
def __init__(self, vault_url: str):
"""Initialize the secret manager with error handling."""
try:
credential = DefaultAzureCredential()
self.client = SecretClient(vault_url=vault_url, credential=credential)
self.vault_url = vault_url
logging.info(f"Initialized SecretManager for vault: {vault_url}")
except Exception as e:
logging.error(f"Failed to initialize SecretManager: {e}")
raise
def get_secret_safely(self, name: str, default: Optional[str] = None) -> Optional[str]:
"""
Safely retrieve a secret with fallback to default value.
"""
try:
secret = self.client.get_secret(name)
return secret.value
except ResourceNotFoundError:
logging.warning(f"Secret '{name}' not found, using default")
return default
except ClientAuthenticationError as e:
logging.error(f"Authentication failed for secret '{name}': {e}")
raise
except HttpResponseError as e:
if e.status_code == 403:
logging.error(f"Access denied for secret '{name}'")
elif e.status_code == 404:
logging.warning(f"Secret '{name}' not found")
return default
else:
logging.error(f"HTTP error getting secret '{name}': {e}")
return default
except Exception as e:
logging.error(f"Unexpected error getting secret '{name}': {e}")
return default
def set_secret_safely(
self,
name: str,
value: str,
**kwargs
) -> bool:
"""
Safely set a secret with error handling.
Returns:
bool: True if successful, False otherwise
"""
try:
self.client.set_secret(name, value, **kwargs)
logging.info(f"Successfully set secret '{name}'")
return True
except ClientAuthenticationError as e:
logging.error(f"Authentication failed setting secret '{name}': {e}")
return False
except HttpResponseError as e:
if e.status_code == 403:
logging.error(f"Access denied setting secret '{name}'")
elif e.status_code == 409:
logging.warning(f"Conflict setting secret '{name}' - may already exist")
else:
logging.error(f"HTTP error setting secret '{name}': {e}")
return False
except Exception as e:
logging.error(f"Unexpected error setting secret '{name}': {e}")
return False
def delete_secret_safely(self, name: str) -> bool:
"""
Safely delete a secret with error handling.
Returns:
bool: True if successful or already deleted, False on error
"""
try:
poller = self.client.begin_delete_secret(name)
deleted_secret = poller.result()
logging.info(f"Successfully deleted secret '{name}'")
return True
except ResourceNotFoundError:
logging.info(f"Secret '{name}' already deleted or doesn't exist")
return True # Consider this success
except ClientAuthenticationError as e:
logging.error(f"Authentication failed deleting secret '{name}': {e}")
return False
except HttpResponseError as e:
if e.status_code == 403:
logging.error(f"Access denied deleting secret '{name}'")
else:
logging.error(f"HTTP error deleting secret '{name}': {e}")
return False
except Exception as e:
logging.error(f"Unexpected error deleting secret '{name}': {e}")
return False
def list_secrets_safely(self) -> Dict[str, Dict[str, Any]]:
"""
Safely list all secrets with error handling.
Returns:
Dict mapping secret names to their properties
"""
secrets = {}
try:
for secret_props in self.client.list_properties_of_secrets():
secrets[secret_props.name] = {
'enabled': secret_props.enabled,
'created_on': secret_props.created_on,
'updated_on': secret_props.updated_on,
'expires_on': secret_props.expires_on,
'tags': secret_props.tags
}
except ClientAuthenticationError as e:
logging.error(f"Authentication failed listing secrets: {e}")
except HttpResponseError as e:
if e.status_code == 403:
logging.error("Access denied listing secrets")
else:
logging.error(f"HTTP error listing secrets: {e}")
except Exception as e:
logging.error(f"Unexpected error listing secrets: {e}")
return secrets
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.client.close()
except Exception as e:
logging.error(f"Error closing client: {e}")import asyncio
from azure.keyvault.secrets.aio import SecretClient
from azure.identity.aio import DefaultAzureCredential
from azure.core.exceptions import *
async def safe_async_secret_operations():
"""
Example of comprehensive async error handling.
"""
credential = DefaultAzureCredential()
try:
async with SecretClient(
"https://vault.vault.azure.net/",
credential
) as client:
# Try to get a secret with error handling
try:
secret = await client.get_secret("my-secret")
print(f"Retrieved: {secret.name}")
except ResourceNotFoundError:
print("Secret not found, creating it...")
try:
new_secret = await client.set_secret("my-secret", "new-value")
print(f"Created: {new_secret.name}")
except HttpResponseError as e:
print(f"Failed to create secret: {e}")
except ClientAuthenticationError as e:
print(f"Authentication failed: {e}")
return
# List secrets with error handling
try:
secret_count = 0
async for secret_props in client.list_properties_of_secrets():
secret_count += 1
print(f"Found secret: {secret_props.name}")
print(f"Total secrets: {secret_count}")
except Exception as e:
print(f"Error listing secrets: {e}")
except Exception as e:
print(f"Failed to initialize client: {e}")
# Run the async example
# asyncio.run(safe_async_secret_operations())from azure.core.exceptions import HttpResponseError
from typing import Optional
class KeyVaultSecretError(Exception):
"""Custom exception for Key Vault secret operations."""
pass
class SecretAccessDeniedError(KeyVaultSecretError):
"""Raised when access is denied to a secret."""
pass
class SecretExpiredError(KeyVaultSecretError):
"""Raised when attempting to use an expired secret."""
pass
def convert_azure_exception(func):
"""
Decorator to convert Azure exceptions to custom exceptions.
"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ResourceNotFoundError as e:
raise KeyVaultSecretError(f"Secret not found: {e}")
except ClientAuthenticationError as e:
raise KeyVaultSecretError(f"Authentication failed: {e}")
except HttpResponseError as e:
if e.status_code == 403:
raise SecretAccessDeniedError(f"Access denied: {e}")
else:
raise KeyVaultSecretError(f"HTTP error {e.status_code}: {e}")
except Exception as e:
raise KeyVaultSecretError(f"Unexpected error: {e}")
return wrapper
@convert_azure_exception
def get_secret_with_custom_exceptions(client: SecretClient, name: str) -> str:
"""Get secret with custom exception conversion."""
secret = client.get_secret(name)
# Check if secret is expired
if secret.properties.expires_on:
from datetime import datetime, timezone
if secret.properties.expires_on < datetime.now(timezone.utc):
raise SecretExpiredError(f"Secret '{name}' has expired")
return secret.valuefrom azure.core.exceptions import (
HttpResponseError,
ResourceNotFoundError,
ResourceExistsError,
ClientAuthenticationError,
ServiceRequestError
)
from typing import Optional, Dict, Any, Callable, TypeVar
import logging
import timeInstall with Tessl CLI
npx tessl i tessl/pypi-azure-keyvault-secrets