CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-data-tables

Microsoft Azure Data Tables Client Library for Python

90

0.96x
Overview
Eval results
Files

batch-operations.mddocs/

Batch Operations

High-performance batch transaction processing supporting multiple entity operations in single atomic transactions with comprehensive operation types and efficient bulk processing capabilities.

Capabilities

Transaction Operations

Execute multiple entity operations atomically within a single transaction for improved performance and data consistency.

class TableClient:
    def submit_transaction(
        self,
        operations: Iterable[Union[
            Tuple[str, Union[TableEntity, Mapping[str, Any]]],
            Tuple[str, Union[TableEntity, Mapping[str, Any]], Mapping[str, Any]]
        ]],
        **kwargs
    ) -> List[Mapping[str, Any]]:
        """
        Submit a batch of operations as a single atomic transaction.

        Parameters:
        - operations: Iterable of operation tuples in format:
          - (operation_type, entity) for basic operations
          - (operation_type, entity, options) for operations with additional parameters

        Operation Types:
        - "create": Insert new entity
        - "update": Update existing entity  
        - "upsert": Insert or update entity
        - "delete": Delete entity

        Returns:
        List of operation results with metadata

        Constraints:
        - All entities must have the same PartitionKey
        - Maximum 100 operations per transaction
        - All operations succeed or all fail (atomic)
        - Total payload must be under 4MB

        Raises:
        TableTransactionError: If any operation fails, entire transaction is rolled back
        """

Usage Example

from azure.data.tables import TableClient, UpdateMode, TableTransactionError

table_client = TableClient.from_connection_string(conn_str, "orders")

# Basic batch operations
operations = [
    # Create new entities
    ("create", {
        "PartitionKey": "2023-Q4",
        "RowKey": "order-001",
        "CustomerName": "John Doe",
        "Total": 299.99,
        "Status": "pending"
    }),
    
    ("create", {
        "PartitionKey": "2023-Q4", 
        "RowKey": "order-002",
        "CustomerName": "Jane Smith",
        "Total": 149.99,
        "Status": "pending"
    }),
    
    # Update existing entity
    ("update", {
        "PartitionKey": "2023-Q4",
        "RowKey": "order-003",
        "Status": "shipped",
        "ShippedDate": "2023-12-15"
    }),
    
    # Upsert entity (insert or update)
    ("upsert", {
        "PartitionKey": "2023-Q4",
        "RowKey": "order-004",
        "CustomerName": "Bob Wilson",
        "Total": 75.50,
        "Status": "completed"
    }),
    
    # Delete entity
    ("delete", {
        "PartitionKey": "2023-Q4",
        "RowKey": "order-005"
    })
]

try:
    results = table_client.submit_transaction(operations)
    print(f"Successfully processed {len(results)} operations")
    
    for i, result in enumerate(results):
        if result:  # Delete operations return None
            print(f"Operation {i}: ETag = {result.get('etag', 'N/A')}")
            
except TableTransactionError as e:
    print(f"Transaction failed at operation {e.index}: {e.message}")
    print(f"Error code: {e.error_code}")

Advanced Transaction Options

Use additional parameters for fine-grained control over batch operations.

# Extended operation format with options
TransactionOperationType = Union[
    Tuple[str, Union[TableEntity, Mapping[str, Any]]],
    Tuple[str, Union[TableEntity, Mapping[str, Any]], Mapping[str, Any]]
]

Usage Example

from azure.data.tables import TableClient, UpdateMode
from azure.core import MatchConditions

table_client = TableClient.from_connection_string(conn_str, "inventory")

# Advanced batch with options
advanced_operations = [
    # Create with no additional options
    ("create", {
        "PartitionKey": "electronics",
        "RowKey": "item-001",
        "Name": "Laptop",
        "Quantity": 10,
        "Price": 999.99
    }),
    
    # Update with REPLACE mode
    ("update", {
        "PartitionKey": "electronics",
        "RowKey": "item-002", 
        "Name": "Updated Tablet",
        "Quantity": 5,
        "Price": 299.99
    }, {
        "mode": UpdateMode.REPLACE
    }),
    
    # Update with optimistic concurrency
    ("update", {
        "PartitionKey": "electronics",
        "RowKey": "item-003",
        "Quantity": 8,
        "etag": "W/\"datetime'2023-12-15T10%3A30%3A00.123Z'\""
    }, {
        "mode": UpdateMode.MERGE,
        "match_condition": MatchConditions.IfNotModified
    }),
    
    # Upsert with REPLACE mode
    ("upsert", {
        "PartitionKey": "electronics",
        "RowKey": "item-004",
        "Name": "Wireless Mouse", 
        "Quantity": 25,
        "Price": 49.99
    }, {
        "mode": UpdateMode.REPLACE
    })
]

try:
    results = table_client.submit_transaction(advanced_operations)
    print("Advanced batch transaction completed successfully")
except TableTransactionError as e:
    print(f"Advanced transaction failed: {e.message}")

Transaction Operation Types

Comprehensive enumeration of supported batch operation types.

class TransactionOperation(Enum):
    """
    Batch transaction operation types.
    
    Defines the available operations that can be performed
    within a batch transaction.
    """
    
    CREATE = "create"    # Insert new entity (fails if exists)
    UPDATE = "update"    # Update existing entity (fails if not exists)
    UPSERT = "upsert"    # Insert or update entity
    DELETE = "delete"    # Delete entity (fails if not exists)

Usage Example

from azure.data.tables import TableClient, TransactionOperation

table_client = TableClient.from_connection_string(conn_str, "products")

# Using enum values for type safety
operations = [
    (TransactionOperation.CREATE, {
        "PartitionKey": "category-a",
        "RowKey": "prod-001",
        "Name": "Product 1",
        "Price": 19.99
    }),
    
    (TransactionOperation.UPSERT, {
        "PartitionKey": "category-a", 
        "RowKey": "prod-002",
        "Name": "Product 2",
        "Price": 29.99
    }),
    
    (TransactionOperation.DELETE, {
        "PartitionKey": "category-a",
        "RowKey": "prod-003"
    })
]

# Submit with enum-based operations
results = table_client.submit_transaction(operations)

Update Modes

Control how entity updates are applied during batch operations.

class UpdateMode(Enum):
    """
    Entity update modes for controlling merge behavior.
    """
    
    REPLACE = "replace"  # Replace entire entity with new properties
    MERGE = "merge"      # Merge new properties with existing entity

Usage Example

from azure.data.tables import TableClient, UpdateMode

table_client = TableClient.from_connection_string(conn_str, "customers")

# Existing entity:
# {
#   "PartitionKey": "vip",
#   "RowKey": "customer-001", 
#   "Name": "John Doe",
#   "Email": "john@example.com",
#   "Phone": "555-1234",
#   "VipLevel": "Gold"
# }

operations_merge = [
    # MERGE: Only updates specified properties, keeps others
    ("update", {
        "PartitionKey": "vip",
        "RowKey": "customer-001",
        "Phone": "555-9999",          # Updated
        "LastContact": "2023-12-15"   # Added
        # Name, Email, VipLevel remain unchanged
    }, {"mode": UpdateMode.MERGE})
]

operations_replace = [
    # REPLACE: Replaces entire entity, unspecified properties are removed
    ("update", {
        "PartitionKey": "vip", 
        "RowKey": "customer-001",
        "Name": "John Doe",
        "Email": "newemail@example.com",
        "VipLevel": "Platinum"
        # Phone property will be removed since not specified
    }, {"mode": UpdateMode.REPLACE})
]

# Execute merge operation
table_client.submit_transaction(operations_merge)
print("Merge update completed - existing properties preserved")

# Execute replace operation
table_client.submit_transaction(operations_replace) 
print("Replace update completed - entity fully replaced")

Batch Processing Patterns

Common patterns for efficient batch processing of large datasets.

Chunked Batch Processing

from azure.data.tables import TableClient
from typing import List, Dict, Any

def process_entities_in_batches(
    table_client: TableClient,
    entities: List[Dict[str, Any]], 
    operation_type: str = "create",
    batch_size: int = 100
):
    """
    Process large number of entities in optimally-sized batches.
    
    Automatically groups entities by PartitionKey and processes
    in batches respecting Azure Tables constraints.
    """
    
    # Group entities by PartitionKey (required for batching)
    partition_groups = {}
    for entity in entities:
        partition_key = entity["PartitionKey"]
        if partition_key not in partition_groups:
            partition_groups[partition_key] = []
        partition_groups[partition_key].append(entity)
    
    results = []
    total_processed = 0
    
    for partition_key, partition_entities in partition_groups.items():
        print(f"Processing partition '{partition_key}': {len(partition_entities)} entities")
        
        # Process partition in batches
        for i in range(0, len(partition_entities), batch_size):
            batch = partition_entities[i:i + batch_size]
            
            operations = [(operation_type, entity) for entity in batch]
            
            try:
                batch_results = table_client.submit_transaction(operations)
                results.extend(batch_results)
                total_processed += len(batch)
                
                print(f"  Batch {i//batch_size + 1}: {len(batch)} entities processed")
                
            except Exception as e:
                print(f"  Batch {i//batch_size + 1} failed: {e}")
                # Handle individual entities or skip batch
                continue
    
    print(f"Total processed: {total_processed}/{len(entities)} entities")
    return results

# Usage example
entities_to_create = [
    {"PartitionKey": "2023-Q4", "RowKey": f"order-{i:04d}", "Amount": i * 10.0}
    for i in range(1, 501)  # 500 entities across potentially multiple partitions
]

results = process_entities_in_batches(
    table_client, 
    entities_to_create,
    operation_type="create"
)

Mixed Operation Batches

from azure.data.tables import TableClient
from datetime import datetime

def process_order_batch(table_client: TableClient, order_updates: Dict):
    """
    Process complex order updates with mixed operations.
    
    Example: New orders, status updates, cancellations in single transaction.
    """
    
    partition_key = f"orders-{datetime.now().strftime('%Y-%m')}"
    
    operations = []
    
    # Add new orders
    for order_id, order_data in order_updates.get("new_orders", {}).items():
        operations.append(("create", {
            "PartitionKey": partition_key,
            "RowKey": order_id,
            **order_data,
            "CreatedAt": datetime.utcnow().isoformat(),
            "Status": "pending"
        }))
    
    # Update existing orders
    for order_id, updates in order_updates.get("order_updates", {}).items():
        operations.append(("update", {
            "PartitionKey": partition_key,
            "RowKey": order_id,
            **updates,
            "ModifiedAt": datetime.utcnow().isoformat()
        }))
    
    # Cancel orders (soft delete by status update)
    for order_id in order_updates.get("cancelled_orders", []):
        operations.append(("update", {
            "PartitionKey": partition_key,
            "RowKey": order_id,
            "Status": "cancelled",
            "CancelledAt": datetime.utcnow().isoformat()
        }))
    
    # Hard delete orders
    for order_id in order_updates.get("deleted_orders", []):
        operations.append(("delete", {
            "PartitionKey": partition_key,
            "RowKey": order_id
        }))
    
    if not operations:
        print("No operations to process")
        return []
    
    try:
        results = table_client.submit_transaction(operations)
        print(f"Successfully processed {len(operations)} order operations")
        return results
        
    except Exception as e:
        print(f"Order batch processing failed: {e}")
        raise

# Usage
order_updates = {
    "new_orders": {
        "ORD-001": {"CustomerName": "Alice", "Total": 199.99},
        "ORD-002": {"CustomerName": "Bob", "Total": 299.99}
    },
    "order_updates": {
        "ORD-003": {"Status": "shipped", "TrackingNumber": "TRK123"},
        "ORD-004": {"Status": "delivered", "DeliveredAt": "2023-12-15T14:30:00"}
    },
    "cancelled_orders": ["ORD-005"],
    "deleted_orders": ["ORD-006"]
}

process_order_batch(table_client, order_updates)

Performance Optimization

Best practices for maximizing batch operation performance.

Batch Size Optimization

import time
from azure.data.tables import TableClient

def find_optimal_batch_size(table_client: TableClient, sample_entities: List[Dict]):
    """
    Determine optimal batch size based on entity size and performance.
    """
    
    test_sizes = [1, 10, 25, 50, 75, 100]  # Azure Tables max is 100
    performance_data = {}
    
    for batch_size in test_sizes:
        if len(sample_entities) < batch_size:
            continue
            
        # Test batch performance
        test_entities = sample_entities[:batch_size]
        operations = [("create", entity) for entity in test_entities]
        
        start_time = time.time()
        try:
            table_client.submit_transaction(operations)
            elapsed = time.time() - start_time
            
            # Calculate throughput
            throughput = batch_size / elapsed
            performance_data[batch_size] = {
                "elapsed": elapsed,
                "throughput": throughput
            }
            
            print(f"Batch size {batch_size}: {elapsed:.3f}s, {throughput:.1f} entities/sec")
            
            # Clean up test entities
            cleanup_operations = [("delete", entity) for entity in test_entities]
            table_client.submit_transaction(cleanup_operations)
            
        except Exception as e:
            print(f"Batch size {batch_size} failed: {e}")
            performance_data[batch_size] = {"error": str(e)}
    
    # Find optimal size
    optimal_size = max(
        (size for size, data in performance_data.items() if "throughput" in data),
        key=lambda size: performance_data[size]["throughput"]
    )
    
    print(f"Optimal batch size: {optimal_size}")
    return optimal_size

Parallel Batch Processing

import asyncio
from concurrent.futures import ThreadPoolExecutor
from azure.data.tables import TableClient
from azure.data.tables.aio import TableClient as AsyncTableClient

async def parallel_batch_processing(
    table_clients: List[AsyncTableClient],
    entity_groups: List[List[Dict[str, Any]]]
):
    """
    Process multiple batches in parallel using async clients.
    
    Each batch must contain entities with the same PartitionKey.
    """
    
    async def process_batch(client: AsyncTableClient, entities: List[Dict]):
        operations = [("create", entity) for entity in entities]
        return await client.submit_transaction(operations)
    
    # Create tasks for parallel processing
    tasks = [
        process_batch(client, entities) 
        for client, entities in zip(table_clients, entity_groups)
    ]
    
    # Execute all batches concurrently
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Process results
    successful_batches = 0
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Batch {i} failed: {result}")
        else:
            successful_batches += 1
            print(f"Batch {i} completed: {len(result)} entities")
    
    print(f"Parallel processing completed: {successful_batches}/{len(tasks)} batches successful")
    return results

# Usage with thread pool for synchronous clients
def parallel_sync_batches(table_client: TableClient, entity_groups: List[List[Dict]]):
    """Process batches in parallel using thread pool."""
    
    def process_sync_batch(entities):
        operations = [("create", entity) for entity in entities]
        return table_client.submit_transaction(operations)
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(process_sync_batch, entities)
            for entities in entity_groups
        ]
        
        results = []
        for i, future in enumerate(futures):
            try:
                result = future.result(timeout=30)
                results.append(result)
                print(f"Batch {i} completed successfully")
            except Exception as e:
                print(f"Batch {i} failed: {e}")
                results.append(None)
        
        return results

# Example usage
entity_groups = [
    [{"PartitionKey": "group1", "RowKey": f"item{i}", "Value": i} for i in range(50)],
    [{"PartitionKey": "group2", "RowKey": f"item{i}", "Value": i} for i in range(50)],
    [{"PartitionKey": "group3", "RowKey": f"item{i}", "Value": i} for i in range(50)]
]

results = parallel_sync_batches(table_client, entity_groups)

Transaction Constraints and Limitations

# Azure Tables batch transaction constraints:
#
# 1. Same PartitionKey: All entities in a batch must have the same PartitionKey
# 2. Maximum Operations: Up to 100 operations per batch
# 3. Payload Size: Total request payload must be under 4MB
# 4. Atomicity: All operations succeed or all fail
# 5. Operation Types: create, update, upsert, delete
# 6. No Query Operations: Batch transactions don't support query operations
# 7. No Cross-Partition: Cannot batch across different partitions

Constraint Validation

from azure.data.tables import TableClient

def validate_batch_constraints(operations):
    """Validate batch operations against Azure Tables constraints."""
    
    if len(operations) > 100:
        raise ValueError(f"Too many operations: {len(operations)} (max 100)")
    
    if not operations:
        raise ValueError("No operations provided")
    
    # Check same partition key constraint
    partition_keys = set()
    for operation in operations:
        operation_type, entity = operation[:2]
        partition_key = entity.get("PartitionKey")
        
        if not partition_key:
            raise ValueError("All entities must have PartitionKey")
        
        partition_keys.add(partition_key)
    
    if len(partition_keys) > 1:
        raise ValueError(f"Multiple partition keys not allowed: {partition_keys}")
    
    # Estimate payload size (rough approximation)
    estimated_size = sum(
        len(str(operation[1])) for operation in operations
    )
    
    if estimated_size > 4 * 1024 * 1024:  # 4MB
        print(f"Warning: Estimated payload size {estimated_size} bytes may exceed 4MB limit")
    
    print(f"Batch validation passed: {len(operations)} operations, partition '{list(partition_keys)[0]}'")
    return True

# Usage
operations = [
    ("create", {"PartitionKey": "test", "RowKey": f"item{i}", "Data": f"value{i}"})
    for i in range(50)
]

validate_batch_constraints(operations)
table_client.submit_transaction(operations)

Type Aliases

Important type aliases used throughout batch operations for type safety and flexibility.

# Entity representation types
EntityType = Union[TableEntity, Mapping[str, Any]]
"""Entity can be TableEntity or any mapping (dict-like) object with PartitionKey and RowKey"""

# Operation type specifications
OperationType = Union[TransactionOperation, str]
"""Operation type can be TransactionOperation enum value or string"""

# Complete transaction operation specification
TransactionOperationType = Union[
    Tuple[OperationType, EntityType],
    Tuple[OperationType, EntityType, Mapping[str, Any]]
]
"""
Transaction operation specification supporting:
- (operation_type, entity) for basic operations
- (operation_type, entity, options) for operations with additional parameters
"""

Install with Tessl CLI

npx tessl i tessl/pypi-azure-data-tables

docs

async-operations.md

batch-operations.md

entity-data-types.md

error-handling.md

index.md

security-access-control.md

service-management.md

table-operations.md

tile.json