CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-keyvault-keys

Microsoft Azure Key Vault Keys client library for Python providing cryptographic key management operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Asynchronous Operations

Complete async/await support for all key management and cryptographic operations, enabling efficient non-blocking I/O in async applications. The azure-keyvault-keys package provides full async implementations parallel to synchronous APIs through dedicated async clients and async-compatible return types.

Capabilities

Async Key Management Client

Asynchronous version of KeyClient for non-blocking key operations.

class KeyClient:
    """Asynchronous client for Azure Key Vault key operations."""
    
    async def __aenter__(self) -> "KeyClient":
        """Async context manager entry."""
    
    async def __aexit__(self, *args) -> None:
        """Async context manager exit."""
    
    async def close(self) -> None:
        """Close the client and release resources."""

Usage Examples

from azure.keyvault.keys.aio import KeyClient
from azure.identity.aio import DefaultAzureCredential

async def main():
    # Create async client
    credential = DefaultAzureCredential()
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        # Perform async operations
        key = await client.create_rsa_key("async-key", size=2048)
        print(f"Created key: {key.name}")

# Run with asyncio
import asyncio
asyncio.run(main())

Async Key Creation

Asynchronous key creation operations.

async def create_key(
    name: str,
    key_type: KeyType,
    **kwargs
) -> KeyVaultKey:
    """Asynchronously create a new key."""

async def create_rsa_key(
    name: str,
    *,
    size: int = None,
    hardware_protected: bool = False,
    **kwargs
) -> KeyVaultKey:
    """Asynchronously create an RSA key."""

async def create_ec_key(
    name: str,
    *,
    curve: KeyCurveName = None,
    hardware_protected: bool = False,
    **kwargs
) -> KeyVaultKey:
    """Asynchronously create an Elliptic Curve key."""

async def create_oct_key(
    name: str,
    *,
    size: int = None,
    hardware_protected: bool = False,
    **kwargs
) -> KeyVaultKey:
    """Asynchronously create a symmetric key."""

Usage Examples

import asyncio
from azure.keyvault.keys.aio import KeyClient
from azure.keyvault.keys import KeyType, KeyCurveName
from azure.identity.aio import DefaultAzureCredential

async def create_multiple_keys():
    """Create multiple keys concurrently."""
    credential = DefaultAzureCredential()
    
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        # Create keys concurrently
        tasks = [
            client.create_rsa_key("async-rsa-key", size=2048),
            client.create_ec_key("async-ec-key", curve=KeyCurveName.p_256),
            client.create_oct_key("async-oct-key", size=256)
        ]
        
        keys = await asyncio.gather(*tasks)
        
        for key in keys:
            print(f"Created key: {key.name} ({key.key_type})")

asyncio.run(create_multiple_keys())

Async Key Retrieval and Management

Asynchronous key retrieval and management operations.

async def get_key(name: str, version: str = None, **kwargs) -> KeyVaultKey:
    """Asynchronously get a key from the vault."""

async def update_key_properties(
    name: str,
    version: str = None,
    **kwargs
) -> KeyVaultKey:
    """Asynchronously update a key's properties."""

async def begin_delete_key(name: str, **kwargs) -> AsyncLROPoller[DeletedKey]:
    """Begin asynchronously deleting a key."""

async def get_deleted_key(name: str, **kwargs) -> DeletedKey:
    """Asynchronously get a deleted key."""

async def begin_recover_deleted_key(name: str, **kwargs) -> AsyncLROPoller[KeyVaultKey]:
    """Begin asynchronously recovering a deleted key."""

async def purge_deleted_key(name: str, **kwargs) -> None:
    """Asynchronously purge a deleted key permanently."""

Usage Examples

async def manage_key_lifecycle():
    """Demonstrate async key lifecycle management."""
    credential = DefaultAzureCredential()
    
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        # Create key
        key = await client.create_rsa_key("lifecycle-key")
        print(f"Created: {key.name}")
        
        # Update properties
        updated_key = await client.update_key_properties(
            "lifecycle-key",
            enabled=False,
            tags={"status": "disabled"}
        )
        print(f"Updated: {updated_key.name}")
        
        # Delete key (soft delete)
        delete_poller = await client.begin_delete_key("lifecycle-key")
        deleted_key = await delete_poller.result()
        print(f"Deleted: {deleted_key.name}")
        
        # Recover key
        recover_poller = await client.begin_recover_deleted_key("lifecycle-key")
        recovered_key = await recover_poller.result()
        print(f"Recovered: {recovered_key.name}")

asyncio.run(manage_key_lifecycle())

Async Paginated Operations

Handle paginated results asynchronously.

def list_properties_of_keys(**kwargs) -> AsyncItemPaged[KeyProperties]:
    """List key properties asynchronously with pagination."""

def list_properties_of_key_versions(name: str, **kwargs) -> AsyncItemPaged[KeyProperties]:
    """List key version properties asynchronously with pagination."""

def list_deleted_keys(**kwargs) -> AsyncItemPaged[DeletedKey]:
    """List deleted keys asynchronously with pagination."""

Usage Examples

async def list_all_keys():
    """List all keys in the vault asynchronously."""
    credential = DefaultAzureCredential()
    
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        print("All keys in vault:")
        
        # Iterate through all keys
        async for key_properties in client.list_properties_of_keys():
            print(f"- {key_properties.name} (enabled: {key_properties.enabled})")
        
        # Or process by pages
        pages = client.list_properties_of_keys().by_page()
        async for page in pages:
            print(f"Processing page with {len(page)} keys")
            for key_properties in page:
                print(f"  - {key_properties.name}")

async def find_keys_by_tag():
    """Find keys with specific tags."""
    credential = DefaultAzureCredential()
    
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        production_keys = []
        
        async for key_properties in client.list_properties_of_keys():
            if key_properties.tags and key_properties.tags.get("environment") == "production":
                production_keys.append(key_properties.name)
        
        print(f"Production keys: {production_keys}")

asyncio.run(list_all_keys())
asyncio.run(find_keys_by_tag())

Async Cryptographic Operations

Asynchronous cryptographic operations using CryptographyClient.

class CryptographyClient:
    """Asynchronous client for cryptographic operations."""
    
    async def encrypt(
        algorithm: EncryptionAlgorithm,
        plaintext: bytes,
        **kwargs
    ) -> EncryptResult:
        """Asynchronously encrypt data."""
    
    async def decrypt(
        algorithm: EncryptionAlgorithm,
        ciphertext: bytes,
        **kwargs
    ) -> DecryptResult:
        """Asynchronously decrypt data."""
    
    async def sign(algorithm: SignatureAlgorithm, digest: bytes, **kwargs) -> SignResult:
        """Asynchronously sign a digest."""
    
    async def verify(
        algorithm: SignatureAlgorithm,
        digest: bytes,
        signature: bytes,
        **kwargs
    ) -> VerifyResult:
        """Asynchronously verify a signature."""
    
    async def wrap_key(algorithm: KeyWrapAlgorithm, key: bytes, **kwargs) -> WrapResult:
        """Asynchronously wrap a key."""
    
    async def unwrap_key(algorithm: KeyWrapAlgorithm, encrypted_key: bytes, **kwargs) -> UnwrapResult:
        """Asynchronously unwrap a key."""

Usage Examples

from azure.keyvault.keys.crypto.aio import CryptographyClient
from azure.keyvault.keys.crypto import EncryptionAlgorithm, SignatureAlgorithm
import hashlib

async def perform_crypto_operations():
    """Perform multiple cryptographic operations concurrently."""
    credential = DefaultAzureCredential()
    key_id = "https://vault.vault.azure.net/keys/my-key/version"
    
    async with CryptographyClient(key_id, credential) as crypto_client:
        # Prepare data
        plaintext1 = b"First message"
        plaintext2 = b"Second message"
        digest1 = hashlib.sha256(plaintext1).digest()
        digest2 = hashlib.sha256(plaintext2).digest()
        
        # Perform operations concurrently
        tasks = [
            crypto_client.encrypt(EncryptionAlgorithm.rsa_oaep_256, plaintext1),
            crypto_client.encrypt(EncryptionAlgorithm.rsa_oaep_256, plaintext2),
            crypto_client.sign(SignatureAlgorithm.rs256, digest1),
            crypto_client.sign(SignatureAlgorithm.rs256, digest2)
        ]
        
        results = await asyncio.gather(*tasks)
        encrypt_result1, encrypt_result2, sign_result1, sign_result2 = results
        
        print(f"Encrypted {len(encrypt_result1.ciphertext)} bytes")
        print(f"Encrypted {len(encrypt_result2.ciphertext)} bytes")
        print(f"Signature 1: {sign_result1.signature.hex()[:16]}...")
        print(f"Signature 2: {sign_result2.signature.hex()[:16]}...")

asyncio.run(perform_crypto_operations())

Async Backup and Recovery

Asynchronous backup and recovery operations.

async def backup_key(name: str, **kwargs) -> bytes:
    """Asynchronously backup a key."""

async def restore_key_backup(backup: bytes, **kwargs) -> KeyVaultKey:
    """Asynchronously restore a key from backup."""

Usage Examples

async def backup_multiple_keys():
    """Backup multiple keys concurrently."""
    credential = DefaultAzureCredential()
    
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        key_names = ["key1", "key2", "key3"]
        
        # Backup all keys concurrently
        backup_tasks = [client.backup_key(name) for name in key_names]
        backups = await asyncio.gather(*backup_tasks, return_exceptions=True)
        
        for i, (key_name, backup_result) in enumerate(zip(key_names, backups)):
            if isinstance(backup_result, Exception):
                print(f"Failed to backup {key_name}: {backup_result}")
            else:
                print(f"Backed up {key_name}: {len(backup_result)} bytes")
                # Save backup to file
                with open(f"{key_name}_backup.bin", "wb") as f:
                    f.write(backup_result)

asyncio.run(backup_multiple_keys())

Async Context Management

Proper resource management with async context managers.

async def __aenter__(self) -> "KeyClient":
    """Enter async context manager."""

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Exit async context manager and clean up resources."""

async def close(self) -> None:
    """Close the client and release resources."""

Usage Examples

async def proper_resource_management():
    """Demonstrate proper async resource management."""
    credential = DefaultAzureCredential()
    
    # Method 1: Using async context manager (recommended)
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        key = await client.get_key("my-key")
        print(f"Got key: {key.name}")
    # Client automatically closed
    
    # Method 2: Manual resource management
    client = KeyClient("https://vault.vault.azure.net/", credential)
    try:
        key = await client.get_key("my-key")
        print(f"Got key: {key.name}")
    finally:
        await client.close()

async def multiple_clients_example():
    """Use multiple clients concurrently."""
    credential = DefaultAzureCredential()
    
    # Multiple vaults
    vault_urls = [
        "https://vault1.vault.azure.net/",
        "https://vault2.vault.azure.net/",
        "https://vault3.vault.azure.net/"
    ]
    
    async def get_key_from_vault(vault_url: str, key_name: str):
        async with KeyClient(vault_url, credential) as client:
            try:
                return await client.get_key(key_name)
            except Exception as e:
                print(f"Error getting key from {vault_url}: {e}")
                return None
    
    # Get same key from multiple vaults concurrently
    tasks = [get_key_from_vault(url, "shared-key") for url in vault_urls]
    keys = await asyncio.gather(*tasks)
    
    for i, key in enumerate(keys):
        if key:
            print(f"Vault {i+1}: Found key {key.name}")
        else:
            print(f"Vault {i+1}: Key not found")

asyncio.run(proper_resource_management())
asyncio.run(multiple_clients_example())

Error Handling in Async Operations

Handle errors appropriately in async contexts.

from azure.core.exceptions import ResourceNotFoundError, ServiceRequestError
import asyncio

async def robust_async_key_operations():
    """Demonstrate robust error handling in async operations."""
    credential = DefaultAzureCredential()
    
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        key_names = ["key1", "nonexistent-key", "key3"]
        
        async def safe_get_key(name: str):
            try:
                return await client.get_key(name)
            except ResourceNotFoundError:
                print(f"Key '{name}' not found")
                return None
            except ServiceRequestError as e:
                print(f"Service error for key '{name}': {e}")
                return None
            except Exception as e:
                print(f"Unexpected error for key '{name}': {e}")
                return None
        
        # Get keys with error handling
        tasks = [safe_get_key(name) for name in key_names]
        results = await asyncio.gather(*tasks)
        
        successful_keys = [key for key in results if key is not None]
        print(f"Successfully retrieved {len(successful_keys)} keys")

asyncio.run(robust_async_key_operations())

Types

class AsyncLROPoller:
    """Asynchronous long-running operation poller."""
    
    async def result(timeout: int = None) -> Any:
        """Asynchronously get the final result of the operation."""
    
    async def wait(timeout: int = None) -> None:
        """Asynchronously wait for the operation to complete."""
    
    def done() -> bool:
        """Check if the operation is complete."""
    
    def status() -> str:
        """Get the current status of the operation."""

class AsyncItemPaged:
    """Asynchronous paginated collection of items."""
    
    def __aiter__(self):
        """Async iterator over all items across pages."""
    
    def by_page(self):
        """Iterate page by page asynchronously."""

Best Practices for Async Operations

Concurrency and Performance

async def efficient_bulk_operations():
    """Demonstrate efficient patterns for bulk operations."""
    credential = DefaultAzureCredential()
    
    async with KeyClient("https://vault.vault.azure.net/", credential) as client:
        # Limit concurrency to avoid overwhelming the service
        semaphore = asyncio.Semaphore(10)  # Max 10 concurrent operations
        
        async def create_key_with_semaphore(name: str):
            async with semaphore:
                return await client.create_rsa_key(name, size=2048)
        
        # Create many keys with controlled concurrency
        key_names = [f"bulk-key-{i}" for i in range(50)]
        tasks = [create_key_with_semaphore(name) for name in key_names]
        
        keys = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful = sum(1 for key in keys if not isinstance(key, Exception))
        print(f"Successfully created {successful}/{len(keys)} keys")

asyncio.run(efficient_bulk_operations())

Timeout and Retry Patterns

async def resilient_async_operations():
    """Implement timeout and retry patterns."""
    
    async def retry_operation(operation, max_retries=3, delay=1.0):
        """Retry an async operation with exponential backoff."""
        for attempt in range(max_retries):
            try:
                return await asyncio.wait_for(operation(), timeout=30.0)
            except asyncio.TimeoutError:
                if attempt == max_retries - 1:
                    raise
                print(f"Timeout on attempt {attempt + 1}, retrying...")
                await asyncio.sleep(delay * (2 ** attempt))
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                print(f"Error on attempt {attempt + 1}: {e}, retrying...")
                await asyncio.sleep(delay)
    
    credential = DefaultAzureCredential()
    client = KeyClient("https://vault.vault.azure.net/", credential)
    
    try:
        # Retry key creation with timeout
        key = await retry_operation(
            lambda: client.create_rsa_key("resilient-key", size=2048)
        )
        print(f"Created key: {key.name}")
    finally:
        await client.close()

asyncio.run(resilient_async_operations())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-keyvault-keys

docs

async-operations.md

backup-recovery.md

crypto-operations.md

import-export.md

index.md

key-management.md

rotation-policies.md

tile.json