Microsoft Azure Cosmos Client Library for Python providing access to Azure Cosmos DB SQL API operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Asynchronous operations for high-performance, non-blocking Azure Cosmos DB applications. The async API provides coroutine-based versions of all synchronous operations with additional connection management capabilities.
Asynchronous version of CosmosClient with additional connection lifecycle management.
class CosmosClient:
def __init__(self, url: str, credential: Union[str, Dict[str, Any], TokenCredential], consistency_level: str = None, **kwargs):
"""
Initialize async CosmosClient.
Parameters: Same as synchronous CosmosClient
"""
@classmethod
def from_connection_string(cls, conn_str: str, credential: str = None, consistency_level: str = None, **kwargs):
"""
Create async CosmosClient from connection string.
Parameters: Same as synchronous version
Returns:
Async CosmosClient instance
"""
async def create_database(self, id: str, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, **kwargs):
"""
Async version of create_database.
Parameters: Same as synchronous version
Returns:
Async DatabaseProxy for the created database
"""
async def create_database_if_not_exists(self, id: str, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, **kwargs):
"""
Async version of create_database_if_not_exists.
"""
def get_database_client(self, database: str):
"""
Get async database client (non-async method).
Returns:
Async DatabaseProxy instance
"""
async def list_databases(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
"""
Async version of list_databases.
"""
async def query_databases(self, query: str = None, parameters: list = None, **kwargs):
"""
Async version of query_databases.
"""
async def delete_database(self, database: str, populate_query_metrics: bool = None, **kwargs):
"""
Async version of delete_database.
"""
async def get_database_account(self, **kwargs):
"""
Async version of get_database_account.
"""
async def close(self):
"""
Close the client and clean up resources.
This method should be called when the client is no longer needed
to properly clean up connections and resources.
"""Asynchronous database-level operations.
class DatabaseProxy:
async def read(self, populate_query_metrics: bool = None, **kwargs):
"""
Async version of read database properties.
"""
async def create_container(self, id: str, partition_key: PartitionKey, **kwargs):
"""
Async version of create_container.
Returns:
Async ContainerProxy for the created container
"""
async def create_container_if_not_exists(self, id: str, partition_key: PartitionKey, **kwargs):
"""
Async version of create_container_if_not_exists.
"""
def get_container_client(self, container: str):
"""
Get async container client (non-async method).
Returns:
Async ContainerProxy instance
"""
async def list_containers(self, max_item_count: int = None, **kwargs):
"""
Async version of list_containers.
"""
async def query_containers(self, query: str = None, parameters: list = None, **kwargs):
"""
Async version of query_containers.
"""
async def replace_container(self, container: str, partition_key: PartitionKey, **kwargs):
"""
Async version of replace_container.
"""
async def delete_container(self, container: str, **kwargs):
"""
Async version of delete_container.
"""
async def get_throughput(self, **kwargs):
"""
Async version of get_throughput.
"""
async def replace_throughput(self, throughput: ThroughputProperties, **kwargs):
"""
Async version of replace_throughput.
"""Asynchronous container-level operations for items and queries.
class ContainerProxy:
async def read(self, populate_query_metrics: bool = None, **kwargs):
"""
Async version of read container properties.
"""
async def create_item(self, body: dict, **kwargs):
"""
Async version of create_item.
"""
async def read_item(self, item: str, partition_key: str, **kwargs):
"""
Async version of read_item.
"""
async def upsert_item(self, body: dict, **kwargs):
"""
Async version of upsert_item.
"""
async def replace_item(self, item: str, body: dict, **kwargs):
"""
Async version of replace_item.
"""
async def patch_item(self, item: str, partition_key: str, patch_operations: list, **kwargs):
"""
Async version of patch_item.
"""
async def delete_item(self, item: str, partition_key: str, **kwargs):
"""
Async version of delete_item.
"""
async def delete_all_items_by_partition_key(self, partition_key: str, **kwargs):
"""
Async version of delete_all_items_by_partition_key.
"""
async def query_items(self, query: str = None, parameters: list = None, **kwargs):
"""
Async version of query_items.
Returns:
Async iterable of query results
"""
async def read_all_items(self, max_item_count: int = None, **kwargs):
"""
Async version of read_all_items.
Returns:
Async iterable of all items
"""
async def query_items_change_feed(self, **kwargs):
"""
Async version of query_items_change_feed.
"""
async def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs):
"""
Async version of execute_item_batch.
"""
@property
def scripts(self):
"""
Get async scripts proxy (non-async property).
Returns:
Async ScriptsProxy instance
"""Asynchronous script operations for stored procedures, triggers, and UDFs.
class ScriptsProxy:
async def create_stored_procedure(self, body: dict, **kwargs):
"""
Async version of create_stored_procedure.
"""
async def execute_stored_procedure(self, sproc: str, partition_key: str = None, params: list = None, **kwargs):
"""
Async version of execute_stored_procedure.
"""
async def list_stored_procedures(self, max_item_count: int = None, **kwargs):
"""
Async version of list_stored_procedures.
"""
async def get_stored_procedure(self, sproc: str, **kwargs):
"""
Async version of get_stored_procedure.
"""
async def replace_stored_procedure(self, sproc: str, body: dict, **kwargs):
"""
Async version of replace_stored_procedure.
"""
async def delete_stored_procedure(self, sproc: str, **kwargs):
"""
Async version of delete_stored_procedure.
"""
async def create_trigger(self, body: dict, **kwargs):
"""
Async version of create_trigger.
"""
async def create_user_defined_function(self, body: dict, **kwargs):
"""
Async version of create_user_defined_function.
"""Asynchronous user and permission operations.
class UserProxy:
async def read(self, **kwargs):
"""
Async version of read user properties.
"""
async def create_permission(self, body: dict, **kwargs):
"""
Async version of create_permission.
"""
async def list_permissions(self, max_item_count: int = None, **kwargs):
"""
Async version of list_permissions.
"""
async def get_permission(self, permission: str, **kwargs):
"""
Async version of get_permission.
"""
async def replace_permission(self, permission: str, body: dict, **kwargs):
"""
Async version of replace_permission.
"""
async def delete_permission(self, permission: str, **kwargs):
"""
Async version of delete_permission.
"""import asyncio
from azure.cosmos.aio import CosmosClient
from azure.cosmos import ConsistencyLevel, PartitionKey
async def basic_async_operations():
# Initialize async client
async with CosmosClient(
url="https://myaccount.documents.azure.com:443/",
credential="myaccountkey==",
consistency_level=ConsistencyLevel.Session
) as client:
# Create database
database = await client.create_database_if_not_exists(
id="AsyncDatabase",
offer_throughput=400
)
# Create container
container = await database.create_container_if_not_exists(
id="AsyncContainer",
partition_key=PartitionKey(path="/category"),
offer_throughput=400
)
# Create items concurrently
items_to_create = [
{"id": f"item{i}", "category": "electronics", "name": f"Product {i}"}
for i in range(10)
]
# Create items concurrently
create_tasks = [
container.create_item(item) for item in items_to_create
]
created_items = await asyncio.gather(*create_tasks, return_exceptions=True)
successful_creates = [item for item in created_items if not isinstance(item, Exception)]
print(f"Successfully created {len(successful_creates)} items")
# Query items
query = "SELECT * FROM c WHERE c.category = @category"
parameters = [{"name": "@category", "value": "electronics"}]
items = []
async for item in container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
):
items.append(item)
print(f"Queried {len(items)} items")
# Run the async function
asyncio.run(basic_async_operations())async def context_manager_example():
"""Demonstrate proper resource management with async context manager."""
async with CosmosClient(
url="https://myaccount.documents.azure.com:443/",
credential="myaccountkey=="
) as client:
database = client.get_database_client("MyDatabase")
container = database.get_container_client("MyContainer")
# Perform operations
await container.create_item({
"id": "async_item",
"category": "test",
"data": "async operation"
})
item = await container.read_item(
item="async_item",
partition_key="test"
)
print(f"Read item: {item['id']}")
# Client is automatically closed when exiting context manager
print("Client resources cleaned up")
asyncio.run(context_manager_example())import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
async def bulk_operations_example():
"""Demonstrate high-performance bulk operations with async."""
async with CosmosClient(
url="https://myaccount.documents.azure.com:443/",
credential="myaccountkey=="
) as client:
database = client.get_database_client("BulkDatabase")
container = database.get_container_client("BulkContainer")
# Generate large number of items
num_items = 1000
items = [
{
"id": f"bulk_item_{i}",
"category": f"category_{i % 10}",
"data": f"Data for item {i}",
"timestamp": time.time()
}
for i in range(num_items)
]
print(f"Creating {num_items} items...")
start_time = time.time()
# Process in batches to avoid overwhelming the service
batch_size = 50
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
for batch in batches:
# Create batch concurrently
tasks = [container.upsert_item(item) for item in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any errors
errors = [r for r in results if isinstance(r, Exception)]
if errors:
print(f"Batch had {len(errors)} errors")
end_time = time.time()
print(f"Bulk operation completed in {end_time - start_time:.2f} seconds")
# Query performance test
print("Testing query performance...")
start_time = time.time()
query_tasks = []
for category_id in range(10):
query = "SELECT COUNT(1) as count FROM c WHERE c.category = @category"
parameters = [{"name": "@category", "value": f"category_{category_id}"}]
task = container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
)
query_tasks.append(task)
# Execute queries concurrently
query_results = await asyncio.gather(*[
collect_async_iterable(query_task) for query_task in query_tasks
])
end_time = time.time()
print(f"Concurrent queries completed in {end_time - start_time:.2f} seconds")
for i, results in enumerate(query_results):
if results:
print(f"Category {i}: {results[0]['count']} items")
async def collect_async_iterable(async_iterable):
"""Helper to collect results from async iterable."""
results = []
async for item in async_iterable:
results.append(item)
return results
asyncio.run(bulk_operations_example())async def change_feed_processor():
"""Process change feed asynchronously with high throughput."""
async with CosmosClient(
url="https://myaccount.documents.azure.com:443/",
credential="myaccountkey=="
) as client:
database = client.get_database_client("ChangeDatabase")
container = database.get_container_client("ChangeContainer")
# Get feed ranges for parallel processing
feed_ranges = await container.read_feed_ranges()
print(f"Processing {len(feed_ranges)} feed ranges")
async def process_feed_range(feed_range, range_id):
"""Process a single feed range."""
continuation = None
processed_count = 0
while True:
try:
changes = container.query_items_change_feed(
feed_range=feed_range,
continuation=continuation,
is_start_from_beginning=True,
max_item_count=100
)
batch_changes = []
async for change in changes:
if "_lsn" in change: # Valid change record
batch_changes.append(change)
if not batch_changes:
break
# Process changes (simulate work)
await asyncio.sleep(0.1) # Simulate processing time
processed_count += len(batch_changes)
print(f"Range {range_id}: Processed {len(batch_changes)} changes "
f"(total: {processed_count})")
# Get continuation for next batch
continuation = changes.get_continuation()
if not continuation:
break
except Exception as e:
print(f"Error processing range {range_id}: {e}")
break
return processed_count
# Process all feed ranges concurrently
tasks = [
process_feed_range(feed_range, i)
for i, feed_range in enumerate(feed_ranges)
]
results = await asyncio.gather(*tasks)
total_processed = sum(results)
print(f"Total changes processed: {total_processed}")
asyncio.run(change_feed_processor())import asyncio
import random
from azure.cosmos.exceptions import CosmosHttpResponseError
async def resilient_async_operations():
"""Demonstrate error handling and retry logic in async operations."""
async def retry_operation(operation, max_retries=3, delay=1.0):
"""Generic retry wrapper for async operations."""
for attempt in range(max_retries):
try:
return await operation()
except CosmosHttpResponseError as e:
if e.status_code == 429: # Rate limiting
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited, waiting {wait_time:.2f}s (attempt {attempt + 1})")
await asyncio.sleep(wait_time)
else:
print(f"HTTP error {e.status_code}: {e.message}")
if attempt == max_retries - 1:
raise
except Exception as e:
print(f"Unexpected error: {e}")
if attempt == max_retries - 1:
raise
raise Exception(f"Operation failed after {max_retries} attempts")
async with CosmosClient(
url="https://myaccount.documents.azure.com:443/",
credential="myaccountkey=="
) as client:
database = client.get_database_client("ResilientDatabase")
container = database.get_container_client("ResilientContainer")
# Example: Resilient item creation
async def create_item_operation():
return await container.create_item({
"id": f"resilient_item_{random.randint(1, 1000)}",
"category": "test",
"timestamp": time.time()
})
# Use retry wrapper
try:
item = await retry_operation(create_item_operation)
print(f"Successfully created item: {item['id']}")
except Exception as e:
print(f"Failed to create item after retries: {e}")
# Parallel operations with individual error handling
async def safe_create_item(item_data):
"""Safely create an item with error handling."""
try:
return await retry_operation(
lambda: container.create_item(item_data)
)
except Exception as e:
print(f"Failed to create item {item_data['id']}: {e}")
return None
# Create multiple items with resilience
items_to_create = [
{"id": f"safe_item_{i}", "category": "batch", "data": f"Data {i}"}
for i in range(10)
]
tasks = [safe_create_item(item) for item in items_to_create]
results = await asyncio.gather(*tasks)
successful_items = [item for item in results if item is not None]
print(f"Successfully created {len(successful_items)} out of {len(items_to_create)} items")
asyncio.run(resilient_async_operations())Install with Tessl CLI
npx tessl i tessl/pypi-azure-cosmos