CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-cosmos

Microsoft Azure Cosmos Client Library for Python providing access to Azure Cosmos DB SQL API operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Asynchronous operations for high-performance, non-blocking Azure Cosmos DB applications. The async API provides coroutine-based versions of all synchronous operations with additional connection management capabilities.

Capabilities

Async Client Operations

Asynchronous version of CosmosClient with additional connection lifecycle management.

class CosmosClient:
    def __init__(self, url: str, credential: Union[str, Dict[str, Any], TokenCredential], consistency_level: str = None, **kwargs):
        """
        Initialize async CosmosClient.
        
        Parameters: Same as synchronous CosmosClient
        """
    
    @classmethod
    def from_connection_string(cls, conn_str: str, credential: str = None, consistency_level: str = None, **kwargs):
        """
        Create async CosmosClient from connection string.
        
        Parameters: Same as synchronous version
        
        Returns:
        Async CosmosClient instance
        """
    
    async def create_database(self, id: str, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, **kwargs):
        """
        Async version of create_database.
        
        Parameters: Same as synchronous version
        
        Returns:
        Async DatabaseProxy for the created database
        """
    
    async def create_database_if_not_exists(self, id: str, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, **kwargs):
        """
        Async version of create_database_if_not_exists.
        """
    
    def get_database_client(self, database: str):
        """
        Get async database client (non-async method).
        
        Returns:
        Async DatabaseProxy instance
        """
    
    async def list_databases(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
        """
        Async version of list_databases.
        """
    
    async def query_databases(self, query: str = None, parameters: list = None, **kwargs):
        """
        Async version of query_databases.
        """
    
    async def delete_database(self, database: str, populate_query_metrics: bool = None, **kwargs):
        """
        Async version of delete_database.
        """
    
    async def get_database_account(self, **kwargs):
        """
        Async version of get_database_account.
        """
    
    async def close(self):
        """
        Close the client and clean up resources.
        
        This method should be called when the client is no longer needed
        to properly clean up connections and resources.
        """

Async Database Operations

Asynchronous database-level operations.

class DatabaseProxy:
    async def read(self, populate_query_metrics: bool = None, **kwargs):
        """
        Async version of read database properties.
        """
    
    async def create_container(self, id: str, partition_key: PartitionKey, **kwargs):
        """
        Async version of create_container.
        
        Returns:
        Async ContainerProxy for the created container
        """
    
    async def create_container_if_not_exists(self, id: str, partition_key: PartitionKey, **kwargs):
        """
        Async version of create_container_if_not_exists.
        """
    
    def get_container_client(self, container: str):
        """
        Get async container client (non-async method).
        
        Returns:
        Async ContainerProxy instance
        """
    
    async def list_containers(self, max_item_count: int = None, **kwargs):
        """
        Async version of list_containers.
        """
    
    async def query_containers(self, query: str = None, parameters: list = None, **kwargs):
        """
        Async version of query_containers.
        """
    
    async def replace_container(self, container: str, partition_key: PartitionKey, **kwargs):
        """
        Async version of replace_container.
        """
    
    async def delete_container(self, container: str, **kwargs):
        """
        Async version of delete_container.
        """
    
    async def get_throughput(self, **kwargs):
        """
        Async version of get_throughput.
        """
    
    async def replace_throughput(self, throughput: ThroughputProperties, **kwargs):
        """
        Async version of replace_throughput.
        """

Async Container Operations

Asynchronous container-level operations for items and queries.

class ContainerProxy:
    async def read(self, populate_query_metrics: bool = None, **kwargs):
        """
        Async version of read container properties.
        """
    
    async def create_item(self, body: dict, **kwargs):
        """
        Async version of create_item.
        """
    
    async def read_item(self, item: str, partition_key: str, **kwargs):
        """
        Async version of read_item.
        """
    
    async def upsert_item(self, body: dict, **kwargs):
        """
        Async version of upsert_item.
        """
    
    async def replace_item(self, item: str, body: dict, **kwargs):
        """
        Async version of replace_item.
        """
    
    async def patch_item(self, item: str, partition_key: str, patch_operations: list, **kwargs):
        """
        Async version of patch_item.
        """
    
    async def delete_item(self, item: str, partition_key: str, **kwargs):
        """
        Async version of delete_item.
        """
    
    async def delete_all_items_by_partition_key(self, partition_key: str, **kwargs):
        """
        Async version of delete_all_items_by_partition_key.
        """
    
    async def query_items(self, query: str = None, parameters: list = None, **kwargs):
        """
        Async version of query_items.
        
        Returns:
        Async iterable of query results
        """
    
    async def read_all_items(self, max_item_count: int = None, **kwargs):
        """
        Async version of read_all_items.
        
        Returns:
        Async iterable of all items
        """
    
    async def query_items_change_feed(self, **kwargs):
        """
        Async version of query_items_change_feed.
        """
    
    async def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs):
        """
        Async version of execute_item_batch.
        """
    
    @property
    def scripts(self):
        """
        Get async scripts proxy (non-async property).
        
        Returns:
        Async ScriptsProxy instance
        """

Async Script Operations

Asynchronous script operations for stored procedures, triggers, and UDFs.

class ScriptsProxy:
    async def create_stored_procedure(self, body: dict, **kwargs):
        """
        Async version of create_stored_procedure.
        """
    
    async def execute_stored_procedure(self, sproc: str, partition_key: str = None, params: list = None, **kwargs):
        """
        Async version of execute_stored_procedure.
        """
    
    async def list_stored_procedures(self, max_item_count: int = None, **kwargs):
        """
        Async version of list_stored_procedures.
        """
    
    async def get_stored_procedure(self, sproc: str, **kwargs):
        """
        Async version of get_stored_procedure.
        """
    
    async def replace_stored_procedure(self, sproc: str, body: dict, **kwargs):
        """
        Async version of replace_stored_procedure.
        """
    
    async def delete_stored_procedure(self, sproc: str, **kwargs):
        """
        Async version of delete_stored_procedure.
        """
    
    async def create_trigger(self, body: dict, **kwargs):
        """
        Async version of create_trigger.
        """
    
    async def create_user_defined_function(self, body: dict, **kwargs):
        """
        Async version of create_user_defined_function.
        """

Async User Management

Asynchronous user and permission operations.

class UserProxy:
    async def read(self, **kwargs):
        """
        Async version of read user properties.
        """
    
    async def create_permission(self, body: dict, **kwargs):
        """
        Async version of create_permission.
        """
    
    async def list_permissions(self, max_item_count: int = None, **kwargs):
        """
        Async version of list_permissions.
        """
    
    async def get_permission(self, permission: str, **kwargs):
        """
        Async version of get_permission.
        """
    
    async def replace_permission(self, permission: str, body: dict, **kwargs):
        """
        Async version of replace_permission.
        """
    
    async def delete_permission(self, permission: str, **kwargs):
        """
        Async version of delete_permission.
        """

Usage Examples

Basic Async Operations

import asyncio
from azure.cosmos.aio import CosmosClient
from azure.cosmos import ConsistencyLevel, PartitionKey

async def basic_async_operations():
    # Initialize async client
    async with CosmosClient(
        url="https://myaccount.documents.azure.com:443/",
        credential="myaccountkey==",
        consistency_level=ConsistencyLevel.Session
    ) as client:
        
        # Create database
        database = await client.create_database_if_not_exists(
            id="AsyncDatabase",
            offer_throughput=400
        )
        
        # Create container
        container = await database.create_container_if_not_exists(
            id="AsyncContainer",
            partition_key=PartitionKey(path="/category"),
            offer_throughput=400
        )
        
        # Create items concurrently
        items_to_create = [
            {"id": f"item{i}", "category": "electronics", "name": f"Product {i}"}
            for i in range(10)
        ]
        
        # Create items concurrently
        create_tasks = [
            container.create_item(item) for item in items_to_create
        ]
        created_items = await asyncio.gather(*create_tasks, return_exceptions=True)
        
        successful_creates = [item for item in created_items if not isinstance(item, Exception)]
        print(f"Successfully created {len(successful_creates)} items")
        
        # Query items
        query = "SELECT * FROM c WHERE c.category = @category"
        parameters = [{"name": "@category", "value": "electronics"}]
        
        items = []
        async for item in container.query_items(
            query=query,
            parameters=parameters,
            enable_cross_partition_query=True
        ):
            items.append(item)
        
        print(f"Queried {len(items)} items")

# Run the async function
asyncio.run(basic_async_operations())

Context Manager Pattern

async def context_manager_example():
    """Demonstrate proper resource management with async context manager."""
    
    async with CosmosClient(
        url="https://myaccount.documents.azure.com:443/",
        credential="myaccountkey=="
    ) as client:
        
        database = client.get_database_client("MyDatabase")
        container = database.get_container_client("MyContainer")
        
        # Perform operations
        await container.create_item({
            "id": "async_item",
            "category": "test",
            "data": "async operation"
        })
        
        item = await container.read_item(
            item="async_item",
            partition_key="test"
        )
        print(f"Read item: {item['id']}")
        
    # Client is automatically closed when exiting context manager
    print("Client resources cleaned up")

asyncio.run(context_manager_example())

High-Performance Bulk Operations

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

async def bulk_operations_example():
    """Demonstrate high-performance bulk operations with async."""
    
    async with CosmosClient(
        url="https://myaccount.documents.azure.com:443/",
        credential="myaccountkey=="
    ) as client:
        
        database = client.get_database_client("BulkDatabase")
        container = database.get_container_client("BulkContainer")
        
        # Generate large number of items
        num_items = 1000
        items = [
            {
                "id": f"bulk_item_{i}",
                "category": f"category_{i % 10}",
                "data": f"Data for item {i}",
                "timestamp": time.time()
            }
            for i in range(num_items)
        ]
        
        print(f"Creating {num_items} items...")
        start_time = time.time()
        
        # Process in batches to avoid overwhelming the service
        batch_size = 50
        batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
        
        for batch in batches:
            # Create batch concurrently
            tasks = [container.upsert_item(item) for item in batch]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Handle any errors
            errors = [r for r in results if isinstance(r, Exception)]
            if errors:
                print(f"Batch had {len(errors)} errors")
        
        end_time = time.time()
        print(f"Bulk operation completed in {end_time - start_time:.2f} seconds")
        
        # Query performance test
        print("Testing query performance...")
        start_time = time.time()
        
        query_tasks = []
        for category_id in range(10):
            query = "SELECT COUNT(1) as count FROM c WHERE c.category = @category"
            parameters = [{"name": "@category", "value": f"category_{category_id}"}]
            
            task = container.query_items(
                query=query,
                parameters=parameters,
                enable_cross_partition_query=True
            )
            query_tasks.append(task)
        
        # Execute queries concurrently
        query_results = await asyncio.gather(*[
            collect_async_iterable(query_task) for query_task in query_tasks
        ])
        
        end_time = time.time()
        print(f"Concurrent queries completed in {end_time - start_time:.2f} seconds")
        
        for i, results in enumerate(query_results):
            if results:
                print(f"Category {i}: {results[0]['count']} items")

async def collect_async_iterable(async_iterable):
    """Helper to collect results from async iterable."""
    results = []
    async for item in async_iterable:
        results.append(item)
    return results

asyncio.run(bulk_operations_example())

Change Feed Processing with Async

async def change_feed_processor():
    """Process change feed asynchronously with high throughput."""
    
    async with CosmosClient(
        url="https://myaccount.documents.azure.com:443/",
        credential="myaccountkey=="
    ) as client:
        
        database = client.get_database_client("ChangeDatabase")
        container = database.get_container_client("ChangeContainer")
        
        # Get feed ranges for parallel processing
        feed_ranges = await container.read_feed_ranges()
        print(f"Processing {len(feed_ranges)} feed ranges")
        
        async def process_feed_range(feed_range, range_id):
            """Process a single feed range."""
            continuation = None
            processed_count = 0
            
            while True:
                try:
                    changes = container.query_items_change_feed(
                        feed_range=feed_range,
                        continuation=continuation,
                        is_start_from_beginning=True,
                        max_item_count=100
                    )
                    
                    batch_changes = []
                    async for change in changes:
                        if "_lsn" in change:  # Valid change record
                            batch_changes.append(change)
                    
                    if not batch_changes:
                        break
                    
                    # Process changes (simulate work)
                    await asyncio.sleep(0.1)  # Simulate processing time
                    processed_count += len(batch_changes)
                    
                    print(f"Range {range_id}: Processed {len(batch_changes)} changes "
                          f"(total: {processed_count})")
                    
                    # Get continuation for next batch
                    continuation = changes.get_continuation()
                    if not continuation:
                        break
                        
                except Exception as e:
                    print(f"Error processing range {range_id}: {e}")
                    break
                    
            return processed_count
        
        # Process all feed ranges concurrently
        tasks = [
            process_feed_range(feed_range, i) 
            for i, feed_range in enumerate(feed_ranges)
        ]
        
        results = await asyncio.gather(*tasks)
        total_processed = sum(results)
        print(f"Total changes processed: {total_processed}")

asyncio.run(change_feed_processor())

Error Handling and Retry Logic

import asyncio
import random
from azure.cosmos.exceptions import CosmosHttpResponseError

async def resilient_async_operations():
    """Demonstrate error handling and retry logic in async operations."""
    
    async def retry_operation(operation, max_retries=3, delay=1.0):
        """Generic retry wrapper for async operations."""
        for attempt in range(max_retries):
            try:
                return await operation()
            except CosmosHttpResponseError as e:
                if e.status_code == 429:  # Rate limiting
                    wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
                    print(f"Rate limited, waiting {wait_time:.2f}s (attempt {attempt + 1})")
                    await asyncio.sleep(wait_time)
                else:
                    print(f"HTTP error {e.status_code}: {e.message}")
                    if attempt == max_retries - 1:
                        raise
            except Exception as e:
                print(f"Unexpected error: {e}")
                if attempt == max_retries - 1:
                    raise
        
        raise Exception(f"Operation failed after {max_retries} attempts")
    
    async with CosmosClient(
        url="https://myaccount.documents.azure.com:443/",
        credential="myaccountkey=="
    ) as client:
        
        database = client.get_database_client("ResilientDatabase")
        container = database.get_container_client("ResilientContainer")
        
        # Example: Resilient item creation
        async def create_item_operation():
            return await container.create_item({
                "id": f"resilient_item_{random.randint(1, 1000)}",
                "category": "test",
                "timestamp": time.time()
            })
        
        # Use retry wrapper
        try:
            item = await retry_operation(create_item_operation)
            print(f"Successfully created item: {item['id']}")
        except Exception as e:
            print(f"Failed to create item after retries: {e}")
        
        # Parallel operations with individual error handling
        async def safe_create_item(item_data):
            """Safely create an item with error handling."""
            try:
                return await retry_operation(
                    lambda: container.create_item(item_data)
                )
            except Exception as e:
                print(f"Failed to create item {item_data['id']}: {e}")
                return None
        
        # Create multiple items with resilience
        items_to_create = [
            {"id": f"safe_item_{i}", "category": "batch", "data": f"Data {i}"}
            for i in range(10)
        ]
        
        tasks = [safe_create_item(item) for item in items_to_create]
        results = await asyncio.gather(*tasks)
        
        successful_items = [item for item in results if item is not None]
        print(f"Successfully created {len(successful_items)} out of {len(items_to_create)} items")

asyncio.run(resilient_async_operations())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-cosmos

docs

async-operations.md

client-operations.md

container-operations.md

database-operations.md

index.md

script-operations.md

user-management.md

tile.json