Microsoft Azure Key Vault Keys client library for Python providing cryptographic key management operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete async/await support for all key management and cryptographic operations, enabling efficient non-blocking I/O in async applications. The azure-keyvault-keys package provides full async implementations parallel to synchronous APIs through dedicated async clients and async-compatible return types.
Asynchronous version of KeyClient for non-blocking key operations.
class KeyClient:
"""Asynchronous client for Azure Key Vault key operations."""
async def __aenter__(self) -> "KeyClient":
"""Async context manager entry."""
async def __aexit__(self, *args) -> None:
"""Async context manager exit."""
async def close(self) -> None:
"""Close the client and release resources."""from azure.keyvault.keys.aio import KeyClient
from azure.identity.aio import DefaultAzureCredential
async def main():
# Create async client
credential = DefaultAzureCredential()
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
# Perform async operations
key = await client.create_rsa_key("async-key", size=2048)
print(f"Created key: {key.name}")
# Run with asyncio
import asyncio
asyncio.run(main())Asynchronous key creation operations.
async def create_key(
name: str,
key_type: KeyType,
**kwargs
) -> KeyVaultKey:
"""Asynchronously create a new key."""
async def create_rsa_key(
name: str,
*,
size: int = None,
hardware_protected: bool = False,
**kwargs
) -> KeyVaultKey:
"""Asynchronously create an RSA key."""
async def create_ec_key(
name: str,
*,
curve: KeyCurveName = None,
hardware_protected: bool = False,
**kwargs
) -> KeyVaultKey:
"""Asynchronously create an Elliptic Curve key."""
async def create_oct_key(
name: str,
*,
size: int = None,
hardware_protected: bool = False,
**kwargs
) -> KeyVaultKey:
"""Asynchronously create a symmetric key."""import asyncio
from azure.keyvault.keys.aio import KeyClient
from azure.keyvault.keys import KeyType, KeyCurveName
from azure.identity.aio import DefaultAzureCredential
async def create_multiple_keys():
"""Create multiple keys concurrently."""
credential = DefaultAzureCredential()
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
# Create keys concurrently
tasks = [
client.create_rsa_key("async-rsa-key", size=2048),
client.create_ec_key("async-ec-key", curve=KeyCurveName.p_256),
client.create_oct_key("async-oct-key", size=256)
]
keys = await asyncio.gather(*tasks)
for key in keys:
print(f"Created key: {key.name} ({key.key_type})")
asyncio.run(create_multiple_keys())Asynchronous key retrieval and management operations.
async def get_key(name: str, version: str = None, **kwargs) -> KeyVaultKey:
"""Asynchronously get a key from the vault."""
async def update_key_properties(
name: str,
version: str = None,
**kwargs
) -> KeyVaultKey:
"""Asynchronously update a key's properties."""
async def begin_delete_key(name: str, **kwargs) -> AsyncLROPoller[DeletedKey]:
"""Begin asynchronously deleting a key."""
async def get_deleted_key(name: str, **kwargs) -> DeletedKey:
"""Asynchronously get a deleted key."""
async def begin_recover_deleted_key(name: str, **kwargs) -> AsyncLROPoller[KeyVaultKey]:
"""Begin asynchronously recovering a deleted key."""
async def purge_deleted_key(name: str, **kwargs) -> None:
"""Asynchronously purge a deleted key permanently."""async def manage_key_lifecycle():
"""Demonstrate async key lifecycle management."""
credential = DefaultAzureCredential()
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
# Create key
key = await client.create_rsa_key("lifecycle-key")
print(f"Created: {key.name}")
# Update properties
updated_key = await client.update_key_properties(
"lifecycle-key",
enabled=False,
tags={"status": "disabled"}
)
print(f"Updated: {updated_key.name}")
# Delete key (soft delete)
delete_poller = await client.begin_delete_key("lifecycle-key")
deleted_key = await delete_poller.result()
print(f"Deleted: {deleted_key.name}")
# Recover key
recover_poller = await client.begin_recover_deleted_key("lifecycle-key")
recovered_key = await recover_poller.result()
print(f"Recovered: {recovered_key.name}")
asyncio.run(manage_key_lifecycle())Handle paginated results asynchronously.
def list_properties_of_keys(**kwargs) -> AsyncItemPaged[KeyProperties]:
"""List key properties asynchronously with pagination."""
def list_properties_of_key_versions(name: str, **kwargs) -> AsyncItemPaged[KeyProperties]:
"""List key version properties asynchronously with pagination."""
def list_deleted_keys(**kwargs) -> AsyncItemPaged[DeletedKey]:
"""List deleted keys asynchronously with pagination."""async def list_all_keys():
"""List all keys in the vault asynchronously."""
credential = DefaultAzureCredential()
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
print("All keys in vault:")
# Iterate through all keys
async for key_properties in client.list_properties_of_keys():
print(f"- {key_properties.name} (enabled: {key_properties.enabled})")
# Or process by pages
pages = client.list_properties_of_keys().by_page()
async for page in pages:
print(f"Processing page with {len(page)} keys")
for key_properties in page:
print(f" - {key_properties.name}")
async def find_keys_by_tag():
"""Find keys with specific tags."""
credential = DefaultAzureCredential()
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
production_keys = []
async for key_properties in client.list_properties_of_keys():
if key_properties.tags and key_properties.tags.get("environment") == "production":
production_keys.append(key_properties.name)
print(f"Production keys: {production_keys}")
asyncio.run(list_all_keys())
asyncio.run(find_keys_by_tag())Asynchronous cryptographic operations using CryptographyClient.
class CryptographyClient:
"""Asynchronous client for cryptographic operations."""
async def encrypt(
algorithm: EncryptionAlgorithm,
plaintext: bytes,
**kwargs
) -> EncryptResult:
"""Asynchronously encrypt data."""
async def decrypt(
algorithm: EncryptionAlgorithm,
ciphertext: bytes,
**kwargs
) -> DecryptResult:
"""Asynchronously decrypt data."""
async def sign(algorithm: SignatureAlgorithm, digest: bytes, **kwargs) -> SignResult:
"""Asynchronously sign a digest."""
async def verify(
algorithm: SignatureAlgorithm,
digest: bytes,
signature: bytes,
**kwargs
) -> VerifyResult:
"""Asynchronously verify a signature."""
async def wrap_key(algorithm: KeyWrapAlgorithm, key: bytes, **kwargs) -> WrapResult:
"""Asynchronously wrap a key."""
async def unwrap_key(algorithm: KeyWrapAlgorithm, encrypted_key: bytes, **kwargs) -> UnwrapResult:
"""Asynchronously unwrap a key."""from azure.keyvault.keys.crypto.aio import CryptographyClient
from azure.keyvault.keys.crypto import EncryptionAlgorithm, SignatureAlgorithm
import hashlib
async def perform_crypto_operations():
"""Perform multiple cryptographic operations concurrently."""
credential = DefaultAzureCredential()
key_id = "https://vault.vault.azure.net/keys/my-key/version"
async with CryptographyClient(key_id, credential) as crypto_client:
# Prepare data
plaintext1 = b"First message"
plaintext2 = b"Second message"
digest1 = hashlib.sha256(plaintext1).digest()
digest2 = hashlib.sha256(plaintext2).digest()
# Perform operations concurrently
tasks = [
crypto_client.encrypt(EncryptionAlgorithm.rsa_oaep_256, plaintext1),
crypto_client.encrypt(EncryptionAlgorithm.rsa_oaep_256, plaintext2),
crypto_client.sign(SignatureAlgorithm.rs256, digest1),
crypto_client.sign(SignatureAlgorithm.rs256, digest2)
]
results = await asyncio.gather(*tasks)
encrypt_result1, encrypt_result2, sign_result1, sign_result2 = results
print(f"Encrypted {len(encrypt_result1.ciphertext)} bytes")
print(f"Encrypted {len(encrypt_result2.ciphertext)} bytes")
print(f"Signature 1: {sign_result1.signature.hex()[:16]}...")
print(f"Signature 2: {sign_result2.signature.hex()[:16]}...")
asyncio.run(perform_crypto_operations())Asynchronous backup and recovery operations.
async def backup_key(name: str, **kwargs) -> bytes:
"""Asynchronously backup a key."""
async def restore_key_backup(backup: bytes, **kwargs) -> KeyVaultKey:
"""Asynchronously restore a key from backup."""async def backup_multiple_keys():
"""Backup multiple keys concurrently."""
credential = DefaultAzureCredential()
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
key_names = ["key1", "key2", "key3"]
# Backup all keys concurrently
backup_tasks = [client.backup_key(name) for name in key_names]
backups = await asyncio.gather(*backup_tasks, return_exceptions=True)
for i, (key_name, backup_result) in enumerate(zip(key_names, backups)):
if isinstance(backup_result, Exception):
print(f"Failed to backup {key_name}: {backup_result}")
else:
print(f"Backed up {key_name}: {len(backup_result)} bytes")
# Save backup to file
with open(f"{key_name}_backup.bin", "wb") as f:
f.write(backup_result)
asyncio.run(backup_multiple_keys())Proper resource management with async context managers.
async def __aenter__(self) -> "KeyClient":
"""Enter async context manager."""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit async context manager and clean up resources."""
async def close(self) -> None:
"""Close the client and release resources."""async def proper_resource_management():
"""Demonstrate proper async resource management."""
credential = DefaultAzureCredential()
# Method 1: Using async context manager (recommended)
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
key = await client.get_key("my-key")
print(f"Got key: {key.name}")
# Client automatically closed
# Method 2: Manual resource management
client = KeyClient("https://vault.vault.azure.net/", credential)
try:
key = await client.get_key("my-key")
print(f"Got key: {key.name}")
finally:
await client.close()
async def multiple_clients_example():
"""Use multiple clients concurrently."""
credential = DefaultAzureCredential()
# Multiple vaults
vault_urls = [
"https://vault1.vault.azure.net/",
"https://vault2.vault.azure.net/",
"https://vault3.vault.azure.net/"
]
async def get_key_from_vault(vault_url: str, key_name: str):
async with KeyClient(vault_url, credential) as client:
try:
return await client.get_key(key_name)
except Exception as e:
print(f"Error getting key from {vault_url}: {e}")
return None
# Get same key from multiple vaults concurrently
tasks = [get_key_from_vault(url, "shared-key") for url in vault_urls]
keys = await asyncio.gather(*tasks)
for i, key in enumerate(keys):
if key:
print(f"Vault {i+1}: Found key {key.name}")
else:
print(f"Vault {i+1}: Key not found")
asyncio.run(proper_resource_management())
asyncio.run(multiple_clients_example())Handle errors appropriately in async contexts.
from azure.core.exceptions import ResourceNotFoundError, ServiceRequestError
import asyncio
async def robust_async_key_operations():
"""Demonstrate robust error handling in async operations."""
credential = DefaultAzureCredential()
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
key_names = ["key1", "nonexistent-key", "key3"]
async def safe_get_key(name: str):
try:
return await client.get_key(name)
except ResourceNotFoundError:
print(f"Key '{name}' not found")
return None
except ServiceRequestError as e:
print(f"Service error for key '{name}': {e}")
return None
except Exception as e:
print(f"Unexpected error for key '{name}': {e}")
return None
# Get keys with error handling
tasks = [safe_get_key(name) for name in key_names]
results = await asyncio.gather(*tasks)
successful_keys = [key for key in results if key is not None]
print(f"Successfully retrieved {len(successful_keys)} keys")
asyncio.run(robust_async_key_operations())class AsyncLROPoller:
"""Asynchronous long-running operation poller."""
async def result(timeout: int = None) -> Any:
"""Asynchronously get the final result of the operation."""
async def wait(timeout: int = None) -> None:
"""Asynchronously wait for the operation to complete."""
def done() -> bool:
"""Check if the operation is complete."""
def status() -> str:
"""Get the current status of the operation."""
class AsyncItemPaged:
"""Asynchronous paginated collection of items."""
def __aiter__(self):
"""Async iterator over all items across pages."""
def by_page(self):
"""Iterate page by page asynchronously."""async def efficient_bulk_operations():
"""Demonstrate efficient patterns for bulk operations."""
credential = DefaultAzureCredential()
async with KeyClient("https://vault.vault.azure.net/", credential) as client:
# Limit concurrency to avoid overwhelming the service
semaphore = asyncio.Semaphore(10) # Max 10 concurrent operations
async def create_key_with_semaphore(name: str):
async with semaphore:
return await client.create_rsa_key(name, size=2048)
# Create many keys with controlled concurrency
key_names = [f"bulk-key-{i}" for i in range(50)]
tasks = [create_key_with_semaphore(name) for name in key_names]
keys = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for key in keys if not isinstance(key, Exception))
print(f"Successfully created {successful}/{len(keys)} keys")
asyncio.run(efficient_bulk_operations())async def resilient_async_operations():
"""Implement timeout and retry patterns."""
async def retry_operation(operation, max_retries=3, delay=1.0):
"""Retry an async operation with exponential backoff."""
for attempt in range(max_retries):
try:
return await asyncio.wait_for(operation(), timeout=30.0)
except asyncio.TimeoutError:
if attempt == max_retries - 1:
raise
print(f"Timeout on attempt {attempt + 1}, retrying...")
await asyncio.sleep(delay * (2 ** attempt))
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"Error on attempt {attempt + 1}: {e}, retrying...")
await asyncio.sleep(delay)
credential = DefaultAzureCredential()
client = KeyClient("https://vault.vault.azure.net/", credential)
try:
# Retry key creation with timeout
key = await retry_operation(
lambda: client.create_rsa_key("resilient-key", size=2048)
)
print(f"Created key: {key.name}")
finally:
await client.close()
asyncio.run(resilient_async_operations())Install with Tessl CLI
npx tessl i tessl/pypi-azure-keyvault-keys