CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-data-tables

Microsoft Azure Data Tables Client Library for Python

90

0.96x
Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Complete asynchronous operation support using async/await patterns for non-blocking I/O, providing identical functionality to synchronous clients with improved performance for concurrent scenarios.

Capabilities

Async Service Client

Asynchronous version of TableServiceClient for account-level operations with non-blocking I/O.

from azure.data.tables.aio import TableServiceClient

class TableServiceClient:
    def __init__(
        self,
        endpoint: str,
        credential=None,
        *,
        audience: str = None,
        api_version: str = None,
        **kwargs
    ):
        """
        Initialize async TableServiceClient.
        
        Parameters identical to synchronous version.
        All network operations are async and non-blocking.
        """

    @classmethod
    def from_connection_string(
        cls,
        conn_str: str,
        **kwargs
    ) -> "TableServiceClient":
        """Create async client from connection string."""

    async def create_table(self, table_name: str, **kwargs) -> "TableClient":
        """Create table asynchronously."""

    async def create_table_if_not_exists(self, table_name: str, **kwargs) -> "TableClient":
        """Create table if not exists asynchronously."""

    async def delete_table(self, table_name: str, **kwargs) -> None:
        """Delete table asynchronously."""

    def list_tables(self, **kwargs) -> AsyncItemPaged[TableItem]:
        """List tables with async iteration."""

    def query_tables(self, query_filter: str, **kwargs) -> AsyncItemPaged[TableItem]:
        """Query tables with async iteration."""

    async def get_service_properties(self, **kwargs) -> Dict[str, object]:
        """Get service properties asynchronously."""

    async def set_service_properties(self, **kwargs) -> None:
        """Set service properties asynchronously."""

    async def get_service_stats(self, **kwargs) -> Dict[str, object]:
        """Get service statistics asynchronously."""

    def get_table_client(self, table_name: str, **kwargs) -> "TableClient":
        """Get async TableClient for specific table."""

    async def close(self) -> None:
        """Close the client and cleanup resources."""

    async def __aenter__(self) -> "TableServiceClient":
        """Async context manager entry."""

    async def __aexit__(self, *args) -> None:
        """Async context manager exit."""

Usage Example

import asyncio
from azure.data.tables.aio import TableServiceClient

async def async_service_operations():
    """Demonstrate async service client operations."""
    
    # Initialize async service client
    async with TableServiceClient.from_connection_string(conn_str) as service_client:
        
        # Create multiple tables concurrently
        table_names = ["customers", "orders", "products", "inventory"]
        
        # Create all tables in parallel
        create_tasks = [
            service_client.create_table_if_not_exists(name) 
            for name in table_names
        ]
        
        table_clients = await asyncio.gather(*create_tasks)
        print(f"Created {len(table_clients)} tables concurrently")
        
        # List all tables asynchronously
        print("Tables in account:")
        async for table in service_client.list_tables():
            print(f"  - {table.name}")
        
        # Query tables with filter
        print("Tables starting with 'c':")
        async for table in service_client.query_tables("TableName ge 'c'"):
            print(f"  - {table.name}")
        
        # Get service properties
        properties = await service_client.get_service_properties()
        logging_enabled = properties.get('analytics_logging', {}).get('read', False)
        print(f"Read logging enabled: {logging_enabled}")

# Run async operations
asyncio.run(async_service_operations())

Async Table Client

Asynchronous version of TableClient for table-specific operations with non-blocking entity operations.

from azure.data.tables.aio import TableClient

class TableClient:
    def __init__(
        self,
        endpoint: str,
        table_name: str,
        *,
        credential=None,
        **kwargs
    ):
        """Initialize async TableClient."""

    @classmethod
    def from_connection_string(
        cls,
        conn_str: str,
        table_name: str,
        **kwargs
    ) -> "TableClient":
        """Create async client from connection string."""

    @classmethod
    def from_table_url(
        cls,
        table_url: str,
        *,
        credential=None,
        **kwargs
    ) -> "TableClient":
        """Create async client from table URL."""

    async def create_table(self, **kwargs) -> TableItem:
        """Create table asynchronously."""

    async def delete_table(self, **kwargs) -> None:
        """Delete table asynchronously."""

    async def create_entity(
        self,
        entity: Union[TableEntity, Mapping[str, Any]],
        **kwargs
    ) -> Dict[str, Any]:
        """Create entity asynchronously."""

    async def get_entity(
        self,
        partition_key: str,
        row_key: str,
        **kwargs
    ) -> TableEntity:
        """Get entity asynchronously."""

    async def update_entity(
        self,
        entity: Union[TableEntity, Mapping[str, Any]],
        **kwargs
    ) -> Dict[str, Any]:
        """Update entity asynchronously."""

    async def upsert_entity(
        self,
        entity: Union[TableEntity, Mapping[str, Any]],
        **kwargs
    ) -> Dict[str, Any]:
        """Upsert entity asynchronously."""

    async def delete_entity(
        self,
        partition_key: str = None,
        row_key: str = None,
        **kwargs
    ) -> None:
        """Delete entity asynchronously."""

    def list_entities(self, **kwargs) -> AsyncItemPaged[TableEntity]:
        """List entities with async iteration."""

    def query_entities(self, query_filter: str, **kwargs) -> AsyncItemPaged[TableEntity]:
        """Query entities with async iteration."""

    async def submit_transaction(
        self,
        operations: Iterable,
        **kwargs
    ) -> List[Mapping[str, Any]]:
        """Submit batch transaction asynchronously."""

    async def get_table_access_policy(self, **kwargs) -> Dict[str, Optional[TableAccessPolicy]]:
        """Get access policies asynchronously."""

    async def set_table_access_policy(
        self,
        signed_identifiers: Mapping[str, Optional[TableAccessPolicy]],
        **kwargs
    ) -> None:
        """Set access policies asynchronously."""

    async def close(self) -> None:
        """Close client and cleanup resources."""

    async def __aenter__(self) -> "TableClient":
        """Async context manager entry."""

    async def __aexit__(self, *args) -> None:
        """Async context manager exit."""

Usage Example

import asyncio
from azure.data.tables.aio import TableClient
from azure.data.tables import TableEntity

async def async_table_operations():
    """Demonstrate async table client operations."""
    
    async with TableClient.from_connection_string(conn_str, "customers") as table_client:
        
        # Create multiple entities concurrently
        entities = [
            TableEntity(
                PartitionKey="vip",
                RowKey=f"customer-{i:03d}",
                Name=f"Customer {i}",
                Email=f"customer{i}@example.com",
                VipLevel="Gold"
            )
            for i in range(1, 11)
        ]
        
        # Create all entities in parallel
        create_tasks = [
            table_client.create_entity(entity) 
            for entity in entities
        ]
        
        results = await asyncio.gather(*create_tasks, return_exceptions=True)
        successful_creates = sum(1 for r in results if not isinstance(r, Exception))
        print(f"Created {successful_creates}/{len(entities)} entities concurrently")
        
        # Query entities asynchronously
        print("VIP customers:")
        async for entity in table_client.query_entities("PartitionKey eq 'vip'"):
            print(f"  - {entity['Name']} ({entity['Email']})")
        
        # Update multiple entities concurrently
        update_tasks = []
        async for entity in table_client.query_entities("PartitionKey eq 'vip'"):
            entity["VipLevel"] = "Platinum"
            entity["LastUpdated"] = "2023-12-15"
            update_tasks.append(table_client.update_entity(entity))
            
            if len(update_tasks) >= 5:  # Process in batches
                break
        
        update_results = await asyncio.gather(*update_tasks)
        print(f"Updated {len(update_results)} entities to Platinum level")

asyncio.run(async_table_operations())

Async Iteration

Handle large result sets efficiently using async iteration patterns.

class AsyncItemPaged:
    """
    Async iterator for paged results from Azure Tables.
    
    Supports async iteration over large result sets
    with automatic paging and efficient memory usage.
    """
    
    def __aiter__(self) -> AsyncIterator[T]:
        """Return async iterator."""
    
    async def __anext__(self) -> T:
        """Get next item asynchronously."""
    
    def by_page(self) -> AsyncIterator[AsyncIterator[T]]:
        """Iterate by pages for batch processing."""

Usage Example

import asyncio
from azure.data.tables.aio import TableClient

async def async_iteration_patterns():
    """Demonstrate various async iteration patterns."""
    
    async with TableClient.from_connection_string(conn_str, "large_dataset") as table_client:
        
        # Basic async iteration
        print("Processing all entities:")
        entity_count = 0
        async for entity in table_client.list_entities():
            entity_count += 1
            if entity_count % 1000 == 0:
                print(f"  Processed {entity_count} entities...")
        
        print(f"Total entities processed: {entity_count}")
        
        # Page-by-page processing for memory efficiency
        print("Processing by pages:")
        page_count = 0
        total_entities = 0
        
        async for page in table_client.list_entities().by_page(results_per_page=500):
            page_count += 1
            page_entities = []
            
            async for entity in page:
                page_entities.append(entity)
            
            total_entities += len(page_entities)
            print(f"  Page {page_count}: {len(page_entities)} entities")
            
            # Process page batch (e.g., bulk operations)
            await process_entity_batch(page_entities)
        
        print(f"Processed {total_entities} entities across {page_count} pages")
        
        # Filtered async iteration with query
        print("Processing filtered results:")
        async for entity in table_client.query_entities(
            "PartitionKey eq 'active' and Status eq 'pending'"
        ):
            await process_pending_entity(entity)

async def process_entity_batch(entities):
    """Process a batch of entities asynchronously."""
    # Simulate async processing
    await asyncio.sleep(0.1)
    print(f"    Processed batch of {len(entities)} entities")

async def process_pending_entity(entity):
    """Process individual pending entity."""
    # Simulate async processing
    await asyncio.sleep(0.01)

asyncio.run(async_iteration_patterns())

Concurrent Operations

Leverage async capabilities for high-performance concurrent operations.

Concurrent Entity Operations

import asyncio
from azure.data.tables.aio import TableClient
from azure.data.tables import TableEntity

async def concurrent_entity_operations():
    """Demonstrate concurrent entity operations for high throughput."""
    
    async with TableClient.from_connection_string(conn_str, "high_throughput") as table_client:
        
        # Concurrent creates with rate limiting
        semaphore = asyncio.Semaphore(10)  # Limit concurrent operations
        
        async def create_entity_with_limit(entity_data):
            async with semaphore:
                entity = TableEntity(**entity_data)
                return await table_client.create_entity(entity)
        
        # Generate entity data
        entity_data_list = [
            {
                "PartitionKey": f"partition-{i // 100}",
                "RowKey": f"entity-{i:06d}",
                "Value": i,
                "Category": f"cat-{i % 10}",
                "Active": i % 2 == 0
            }
            for i in range(1000)
        ]
        
        # Create entities concurrently with rate limiting
        print("Creating entities with concurrency control...")
        start_time = asyncio.get_event_loop().time()
        
        create_tasks = [
            create_entity_with_limit(data) 
            for data in entity_data_list
        ]
        
        results = await asyncio.gather(*create_tasks, return_exceptions=True)
        
        end_time = asyncio.get_event_loop().time()
        successful_creates = sum(1 for r in results if not isinstance(r, Exception))
        
        print(f"Created {successful_creates}/{len(entity_data_list)} entities")
        print(f"Time: {end_time - start_time:.2f}s")
        print(f"Throughput: {successful_creates / (end_time - start_time):.1f} entities/sec")

asyncio.run(concurrent_entity_operations())

Async Batch Processing

import asyncio
from azure.data.tables.aio import TableClient
from typing import List, Dict, Any

async def async_batch_processing():
    """Process multiple batches concurrently across partitions."""
    
    async with TableClient.from_connection_string(conn_str, "batch_demo") as table_client:
        
        # Prepare batch data for different partitions
        partition_batches = {
            "batch-1": [
                {"PartitionKey": "batch-1", "RowKey": f"item-{i}", "Value": i}
                for i in range(50)
            ],
            "batch-2": [
                {"PartitionKey": "batch-2", "RowKey": f"item-{i}", "Value": i}  
                for i in range(50)
            ],
            "batch-3": [
                {"PartitionKey": "batch-3", "RowKey": f"item-{i}", "Value": i}
                for i in range(50)
            ]
        }
        
        async def process_partition_batch(partition_key: str, entities: List[Dict]):
            """Process a single partition batch."""
            operations = [("create", entity) for entity in entities]
            
            try:
                result = await table_client.submit_transaction(operations)
                print(f"Partition {partition_key}: {len(result)} entities created")
                return len(result)
            except Exception as e:
                print(f"Partition {partition_key} failed: {e}")
                return 0
        
        # Process all partition batches concurrently
        batch_tasks = [
            process_partition_batch(partition_key, entities)
            for partition_key, entities in partition_batches.items()
        ]
        
        results = await asyncio.gather(*batch_tasks)
        total_processed = sum(results)
        
        print(f"Concurrent batch processing completed: {total_processed} total entities")

asyncio.run(async_batch_processing())

Error Handling in Async Context

Handle exceptions and errors properly in asynchronous operations.

Async Error Handling Patterns

import asyncio
from azure.data.tables.aio import TableClient
from azure.data.tables import TableTransactionError
from azure.core.exceptions import ResourceNotFoundError, ServiceRequestError

async def async_error_handling():
    """Demonstrate error handling patterns in async operations."""
    
    async with TableClient.from_connection_string(conn_str, "error_demo") as table_client:
        
        # Retry with exponential backoff (async version)
        async def async_retry_with_backoff(coro_func, max_retries=3):
            for attempt in range(max_retries):
                try:
                    return await coro_func()
                except ServiceRequestError as e:
                    if attempt == max_retries - 1:
                        raise
                    
                    delay = 2 ** attempt  # Exponential backoff
                    print(f"Attempt {attempt + 1} failed, retrying in {delay}s")
                    await asyncio.sleep(delay)
        
        # Concurrent operations with individual error handling
        async def safe_create_entity(entity_data):
            try:
                return await table_client.create_entity(entity_data)
            except Exception as e:
                print(f"Failed to create entity {entity_data.get('RowKey')}: {e}")
                return None
        
        # Process entities with error isolation
        entities = [
            {"PartitionKey": "safe", "RowKey": f"item-{i}", "Value": i}
            for i in range(20)
        ]
        
        # Add some problematic entities
        entities.extend([
            {"PartitionKey": "safe", "RowKey": "item-5", "Value": 999},  # Duplicate
            {"PartitionKey": "safe"},  # Missing RowKey
        ])
        
        # Create all entities concurrently with error handling
        create_tasks = [safe_create_entity(entity) for entity in entities]
        results = await asyncio.gather(*create_tasks)
        
        successful_creates = [r for r in results if r is not None]
        print(f"Successfully created {len(successful_creates)} entities")
        
        # Batch operations with error recovery
        async def robust_batch_operation(operations):
            try:
                return await table_client.submit_transaction(operations)
            except TableTransactionError as e:
                print(f"Batch failed at operation {e.index}: {e.message}")
                
                # Execute operations individually as fallback
                results = []
                for i, (op_type, entity) in enumerate(operations):
                    if i == e.index:
                        results.append(None)  # Skip failed operation
                        continue
                    
                    try:
                        if op_type == "create":
                            result = await table_client.create_entity(entity)
                        elif op_type == "update":
                            result = await table_client.update_entity(entity)
                        # Add other operation types as needed
                        
                        results.append(result)
                    except Exception as individual_error:
                        print(f"Individual operation {i} also failed: {individual_error}")
                        results.append(None)
                
                return results
        
        # Test batch with error recovery
        batch_operations = [
            ("create", {"PartitionKey": "batch", "RowKey": f"item-{i}", "Value": i})
            for i in range(10)
        ]
        
        batch_results = await robust_batch_operation(batch_operations)
        successful_batch = [r for r in batch_results if r is not None]
        print(f"Batch processing: {len(successful_batch)} operations succeeded")

asyncio.run(async_error_handling())

Context Management

Proper resource management using async context managers.

Resource Management Patterns

import asyncio
from azure.data.tables.aio import TableServiceClient, TableClient

async def resource_management_patterns():
    """Demonstrate proper async resource management."""
    
    # Pattern 1: Service client context manager
    async with TableServiceClient.from_connection_string(conn_str) as service_client:
        
        # Create table client from service client
        table_client = service_client.get_table_client("managed_table")
        
        # Use table client within service client context
        async with table_client:
            await table_client.create_table()
            
            entity = {"PartitionKey": "test", "RowKey": "001", "Value": "data"}
            await table_client.create_entity(entity)
            
            async for entity in table_client.list_entities():
                print(f"Entity: {entity['RowKey']}")
    
    # Pattern 2: Multiple concurrent clients with proper cleanup
    async def managed_concurrent_operations():
        clients = []
        try:
            # Create multiple clients
            for i in range(3):
                client = TableClient.from_connection_string(conn_str, f"table_{i}")
                clients.append(client)
            
            # Use clients concurrently
            create_tasks = [
                client.create_table() 
                for client in clients
            ]
            
            await asyncio.gather(*create_tasks, return_exceptions=True)
            
            # Perform operations
            operation_tasks = []
            for i, client in enumerate(clients):
                entity = {"PartitionKey": "concurrent", "RowKey": f"item-{i}", "Value": i}
                operation_tasks.append(client.create_entity(entity))
            
            await asyncio.gather(*operation_tasks)
            
        finally:
            # Ensure all clients are properly closed
            close_tasks = [client.close() for client in clients]
            await asyncio.gather(*close_tasks, return_exceptions=True)
    
    await managed_concurrent_operations()
    
    # Pattern 3: Long-lived client with proper lifecycle
    class AsyncTableManager:
        def __init__(self, connection_string: str, table_name: str):
            self.connection_string = connection_string
            self.table_name = table_name
            self.client = None
        
        async def start(self):
            """Initialize the async client."""
            self.client = TableClient.from_connection_string(
                self.connection_string, 
                self.table_name
            )
            await self.client.create_table()
        
        async def stop(self):
            """Cleanup the async client."""
            if self.client:
                await self.client.close()
        
        async def process_entities(self, entities):
            """Process entities with the managed client."""
            if not self.client:
                raise RuntimeError("Manager not started")
            
            tasks = [
                self.client.create_entity(entity) 
                for entity in entities
            ]
            
            return await asyncio.gather(*tasks, return_exceptions=True)
    
    # Use the managed client
    manager = AsyncTableManager(conn_str, "managed_operations")
    try:
        await manager.start()
        
        entities = [
            {"PartitionKey": "managed", "RowKey": f"item-{i}", "Value": i}
            for i in range(10)
        ]
        
        results = await manager.process_entities(entities)
        successful = sum(1 for r in results if not isinstance(r, Exception))
        print(f"Managed processing: {successful}/{len(entities)} entities")
        
    finally:
        await manager.stop()

asyncio.run(resource_management_patterns())

Performance Considerations

Async Best Practices

  1. Connection Pooling: Async clients automatically manage connection pools
  2. Concurrency Limits: Use semaphores to control concurrent operations
  3. Resource Management: Always use context managers or explicit cleanup
  4. Error Isolation: Handle exceptions in concurrent operations individually
  5. Batch Optimization: Process multiple partitions concurrently
  6. Memory Management: Use async iteration for large result sets

Example High-Performance Pattern

import asyncio
from azure.data.tables.aio import TableClient
from contextlib import AsyncExitStack

async def high_performance_pattern():
    """Optimized pattern for high-throughput async operations."""
    
    # Configuration
    MAX_CONCURRENT_OPERATIONS = 50
    BATCH_SIZE = 100
    TABLE_COUNT = 5
    
    semaphore = asyncio.Semaphore(MAX_CONCURRENT_OPERATIONS)
    
    async with AsyncExitStack() as stack:
        # Create multiple table clients
        clients = []
        for i in range(TABLE_COUNT):
            client = await stack.enter_async_context(
                TableClient.from_connection_string(conn_str, f"perf_table_{i}")
            )
            clients.append(client)
        
        # High-throughput entity creation
        async def create_entity_throttled(client, entity):
            async with semaphore:
                return await client.create_entity(entity)
        
        # Generate workload across multiple tables
        all_tasks = []
        for client_idx, client in enumerate(clients):
            for batch_idx in range(10):  # 10 batches per table
                entities = [
                    {
                        "PartitionKey": f"perf-{batch_idx}",
                        "RowKey": f"item-{i:06d}",
                        "TableIndex": client_idx,
                        "BatchIndex": batch_idx,
                        "Value": i
                    }
                    for i in range(BATCH_SIZE)
                ]
                
                # Add tasks for this batch
                for entity in entities:
                    task = create_entity_throttled(client, entity)
                    all_tasks.append(task)
        
        # Execute all operations concurrently
        print(f"Executing {len(all_tasks)} operations across {TABLE_COUNT} tables...")
        start_time = asyncio.get_event_loop().time()
        
        results = await asyncio.gather(*all_tasks, return_exceptions=True)
        
        end_time = asyncio.get_event_loop().time()
        successful = sum(1 for r in results if not isinstance(r, Exception))
        
        print(f"Completed: {successful}/{len(all_tasks)} operations")
        print(f"Time: {end_time - start_time:.2f}s")
        print(f"Throughput: {successful / (end_time - start_time):.1f} ops/sec")

# Run high-performance example
asyncio.run(high_performance_pattern())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-data-tables

docs

async-operations.md

batch-operations.md

entity-data-types.md

error-handling.md

index.md

security-access-control.md

service-management.md

table-operations.md

tile.json