CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-keyvault-secrets

Microsoft Azure Key Vault secrets client library for Python providing secure storage and management of sensitive information

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

error-handling.mddocs/

Error Handling and Exceptions

Comprehensive error handling patterns and exception management for robust secret operations with proper authentication and network error handling. The Azure Key Vault Secrets library uses standard Azure Core exceptions for consistent error handling across Azure SDKs.

Capabilities

Core Exception Classes

Standard Azure Core exceptions used throughout the Key Vault Secrets library.

# From azure.core.exceptions

class ResourceNotFoundError(HttpResponseError):
    """
    Raised when a requested secret does not exist.
    
    Common scenarios:
    - Getting a secret that doesn't exist
    - Updating a non-existent secret
    - Operating on a deleted secret that has been purged
    """

class ResourceExistsError(HttpResponseError):
    """
    Raised when attempting to create a resource that already exists.
    
    Common scenarios:
    - Restoring a secret when one with the same name already exists
    - Conflicting operations during secret creation
    """

class ClientAuthenticationError(HttpResponseError):
    """
    Raised when authentication with Azure Key Vault fails.
    
    Common scenarios:
    - Invalid or expired credentials
    - Insufficient permissions for the operation
    - Network issues preventing authentication
    """

class HttpResponseError(Exception):
    """
    Base exception for HTTP response errors.
    
    Properties:
    - status_code (int): HTTP status code
    - reason (str): HTTP reason phrase
    - response (HttpResponse): Full HTTP response object
    - message (str): Error message
    """

class ServiceRequestError(Exception):
    """
    Raised when there's an error in the service request.
    
    Common scenarios:
    - Network connectivity issues
    - DNS resolution problems
    - Connection timeouts
    """

Authentication Errors

Specific patterns and handling for authentication-related failures.

# Authentication error scenarios and handling

def handle_authentication_error(error: ClientAuthenticationError) -> None:
    """
    Handle authentication errors with appropriate remediation.
    
    Common causes:
    - Expired tokens or certificates
    - Insufficient Key Vault permissions
    - Invalid credentials configuration
    - Network firewall blocking authentication endpoints
    """

Permission and Access Errors

Handling authorization failures and access policy issues.

# Permission error patterns (manifested as HttpResponseError with 403 status)

def handle_permission_error(error: HttpResponseError) -> None:
    """
    Handle permission and authorization errors.
    
    Common causes:
    - Missing Key Vault access policies
    - Insufficient RBAC roles
    - Operations not allowed by access policy
    - Vault access restricted by network rules
    """

Resource State Errors

Handling errors related to secret state and lifecycle.

def handle_secret_state_error(error: HttpResponseError) -> None:
    """
    Handle errors related to secret state and lifecycle.
    
    Common scenarios:
    - Operating on disabled secrets
    - Accessing expired secrets
    - Conflicts with secret deletion state
    - Version-specific operation errors
    """

Error Handling Patterns

Basic Exception Handling

from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import (
    ResourceNotFoundError,
    ClientAuthenticationError,
    HttpResponseError
)

def safe_get_secret(client: SecretClient, name: str) -> Optional[str]:
    """Safely retrieve a secret with comprehensive error handling."""
    try:
        secret = client.get_secret(name)
        return secret.value
        
    except ResourceNotFoundError:
        print(f"Secret '{name}' does not exist")
        return None
        
    except ClientAuthenticationError as e:
        print(f"Authentication failed: {e}")
        # Handle credential refresh or re-authentication
        return None
        
    except HttpResponseError as e:
        if e.status_code == 403:
            print(f"Access denied for secret '{name}': {e}")
        elif e.status_code == 429:
            print(f"Rate limited. Retry after: {e.response.headers.get('Retry-After', 'unknown')}")
        else:
            print(f"HTTP error {e.status_code}: {e}")
        return None
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

Retry Logic for Transient Failures

import time
from typing import Optional, Callable, TypeVar
from azure.core.exceptions import HttpResponseError, ServiceRequestError

T = TypeVar('T')

def retry_operation(
    operation: Callable[[], T],
    max_retries: int = 3,
    initial_delay: float = 1.0,
    backoff_multiplier: float = 2.0
) -> Optional[T]:
    """
    Retry an operation with exponential backoff for transient failures.
    """
    for attempt in range(max_retries):
        try:
            return operation()
            
        except (ServiceRequestError, HttpResponseError) as e:
            # Check if error is retryable
            if isinstance(e, HttpResponseError):
                # Don't retry client errors (4xx) except rate limiting (429)
                if 400 <= e.status_code < 500 and e.status_code != 429:
                    raise
                    
            if attempt == max_retries - 1:
                raise  # Last attempt, re-raise the exception
                
            delay = initial_delay * (backoff_multiplier ** attempt)
            print(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
            time.sleep(delay)
            
    return None

# Usage example
def get_secret_with_retry(client: SecretClient, name: str) -> Optional[str]:
    """Get secret with automatic retry for transient failures."""
    try:
        operation = lambda: client.get_secret(name).value
        return retry_operation(operation)
    except Exception as e:
        print(f"Failed to get secret after retries: {e}")
        return None

Comprehensive Secret Management with Error Handling

from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from azure.core.exceptions import *
from typing import Optional, Dict, Any
import logging

class SecureSecretManager:
    """
    Wrapper class providing robust secret management with comprehensive error handling.
    """
    
    def __init__(self, vault_url: str):
        """Initialize the secret manager with error handling."""
        try:
            credential = DefaultAzureCredential()
            self.client = SecretClient(vault_url=vault_url, credential=credential)
            self.vault_url = vault_url
            logging.info(f"Initialized SecretManager for vault: {vault_url}")
        except Exception as e:
            logging.error(f"Failed to initialize SecretManager: {e}")
            raise
    
    def get_secret_safely(self, name: str, default: Optional[str] = None) -> Optional[str]:
        """
        Safely retrieve a secret with fallback to default value.
        """
        try:
            secret = self.client.get_secret(name)
            return secret.value
            
        except ResourceNotFoundError:
            logging.warning(f"Secret '{name}' not found, using default")
            return default
            
        except ClientAuthenticationError as e:
            logging.error(f"Authentication failed for secret '{name}': {e}")
            raise
            
        except HttpResponseError as e:
            if e.status_code == 403:
                logging.error(f"Access denied for secret '{name}'")
            elif e.status_code == 404:
                logging.warning(f"Secret '{name}' not found")
                return default
            else:
                logging.error(f"HTTP error getting secret '{name}': {e}")
            return default
            
        except Exception as e:
            logging.error(f"Unexpected error getting secret '{name}': {e}")
            return default
    
    def set_secret_safely(
        self,
        name: str,
        value: str,
        **kwargs
    ) -> bool:
        """
        Safely set a secret with error handling.
        
        Returns:
        bool: True if successful, False otherwise
        """
        try:
            self.client.set_secret(name, value, **kwargs)
            logging.info(f"Successfully set secret '{name}'")
            return True
            
        except ClientAuthenticationError as e:
            logging.error(f"Authentication failed setting secret '{name}': {e}")
            return False
            
        except HttpResponseError as e:
            if e.status_code == 403:
                logging.error(f"Access denied setting secret '{name}'")
            elif e.status_code == 409:
                logging.warning(f"Conflict setting secret '{name}' - may already exist")
            else:
                logging.error(f"HTTP error setting secret '{name}': {e}")
            return False
            
        except Exception as e:
            logging.error(f"Unexpected error setting secret '{name}': {e}")
            return False
    
    def delete_secret_safely(self, name: str) -> bool:
        """
        Safely delete a secret with error handling.
        
        Returns:
        bool: True if successful or already deleted, False on error
        """
        try:
            poller = self.client.begin_delete_secret(name)
            deleted_secret = poller.result()
            logging.info(f"Successfully deleted secret '{name}'")
            return True
            
        except ResourceNotFoundError:
            logging.info(f"Secret '{name}' already deleted or doesn't exist")
            return True  # Consider this success
            
        except ClientAuthenticationError as e:
            logging.error(f"Authentication failed deleting secret '{name}': {e}")
            return False
            
        except HttpResponseError as e:
            if e.status_code == 403:
                logging.error(f"Access denied deleting secret '{name}'")
            else:
                logging.error(f"HTTP error deleting secret '{name}': {e}")
            return False
            
        except Exception as e:
            logging.error(f"Unexpected error deleting secret '{name}': {e}")
            return False
    
    def list_secrets_safely(self) -> Dict[str, Dict[str, Any]]:
        """
        Safely list all secrets with error handling.
        
        Returns:
        Dict mapping secret names to their properties
        """
        secrets = {}
        
        try:
            for secret_props in self.client.list_properties_of_secrets():
                secrets[secret_props.name] = {
                    'enabled': secret_props.enabled,
                    'created_on': secret_props.created_on,
                    'updated_on': secret_props.updated_on,
                    'expires_on': secret_props.expires_on,
                    'tags': secret_props.tags
                }
                
        except ClientAuthenticationError as e:
            logging.error(f"Authentication failed listing secrets: {e}")
            
        except HttpResponseError as e:
            if e.status_code == 403:
                logging.error("Access denied listing secrets")
            else:
                logging.error(f"HTTP error listing secrets: {e}")
                
        except Exception as e:
            logging.error(f"Unexpected error listing secrets: {e}")
            
        return secrets
    
    def __enter__(self):
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        try:
            self.client.close()
        except Exception as e:
            logging.error(f"Error closing client: {e}")

Async Error Handling

import asyncio
from azure.keyvault.secrets.aio import SecretClient
from azure.identity.aio import DefaultAzureCredential
from azure.core.exceptions import *

async def safe_async_secret_operations():
    """
    Example of comprehensive async error handling.
    """
    credential = DefaultAzureCredential()
    
    try:
        async with SecretClient(
            "https://vault.vault.azure.net/", 
            credential
        ) as client:
            
            # Try to get a secret with error handling
            try:
                secret = await client.get_secret("my-secret")
                print(f"Retrieved: {secret.name}")
                
            except ResourceNotFoundError:
                print("Secret not found, creating it...")
                try:
                    new_secret = await client.set_secret("my-secret", "new-value")
                    print(f"Created: {new_secret.name}")
                except HttpResponseError as e:
                    print(f"Failed to create secret: {e}")
                    
            except ClientAuthenticationError as e:
                print(f"Authentication failed: {e}")
                return
                
            # List secrets with error handling
            try:
                secret_count = 0
                async for secret_props in client.list_properties_of_secrets():
                    secret_count += 1
                    print(f"Found secret: {secret_props.name}")
                    
                print(f"Total secrets: {secret_count}")
                
            except Exception as e:
                print(f"Error listing secrets: {e}")
                
    except Exception as e:
        print(f"Failed to initialize client: {e}")

# Run the async example
# asyncio.run(safe_async_secret_operations())

Custom Exception Handling

from azure.core.exceptions import HttpResponseError
from typing import Optional

class KeyVaultSecretError(Exception):
    """Custom exception for Key Vault secret operations."""
    pass

class SecretAccessDeniedError(KeyVaultSecretError):
    """Raised when access is denied to a secret."""
    pass

class SecretExpiredError(KeyVaultSecretError):
    """Raised when attempting to use an expired secret."""
    pass

def convert_azure_exception(func):
    """
    Decorator to convert Azure exceptions to custom exceptions.
    """
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except ResourceNotFoundError as e:
            raise KeyVaultSecretError(f"Secret not found: {e}")
        except ClientAuthenticationError as e:
            raise KeyVaultSecretError(f"Authentication failed: {e}")
        except HttpResponseError as e:
            if e.status_code == 403:
                raise SecretAccessDeniedError(f"Access denied: {e}")
            else:
                raise KeyVaultSecretError(f"HTTP error {e.status_code}: {e}")
        except Exception as e:
            raise KeyVaultSecretError(f"Unexpected error: {e}")
    
    return wrapper

@convert_azure_exception
def get_secret_with_custom_exceptions(client: SecretClient, name: str) -> str:
    """Get secret with custom exception conversion."""
    secret = client.get_secret(name)
    
    # Check if secret is expired
    if secret.properties.expires_on:
        from datetime import datetime, timezone
        if secret.properties.expires_on < datetime.now(timezone.utc):
            raise SecretExpiredError(f"Secret '{name}' has expired")
    
    return secret.value

Required Imports

from azure.core.exceptions import (
    HttpResponseError,
    ResourceNotFoundError,
    ResourceExistsError,
    ClientAuthenticationError,
    ServiceRequestError
)
from typing import Optional, Dict, Any, Callable, TypeVar
import logging
import time

Install with Tessl CLI

npx tessl i tessl/pypi-azure-keyvault-secrets

docs

async-client.md

error-handling.md

index.md

models.md

sync-client.md

tile.json