Microsoft Azure Data Tables Client Library for Python
90
High-performance batch transaction processing supporting multiple entity operations in single atomic transactions with comprehensive operation types and efficient bulk processing capabilities.
Execute multiple entity operations atomically within a single transaction for improved performance and data consistency.
class TableClient:
def submit_transaction(
self,
operations: Iterable[Union[
Tuple[str, Union[TableEntity, Mapping[str, Any]]],
Tuple[str, Union[TableEntity, Mapping[str, Any]], Mapping[str, Any]]
]],
**kwargs
) -> List[Mapping[str, Any]]:
"""
Submit a batch of operations as a single atomic transaction.
Parameters:
- operations: Iterable of operation tuples in format:
- (operation_type, entity) for basic operations
- (operation_type, entity, options) for operations with additional parameters
Operation Types:
- "create": Insert new entity
- "update": Update existing entity
- "upsert": Insert or update entity
- "delete": Delete entity
Returns:
List of operation results with metadata
Constraints:
- All entities must have the same PartitionKey
- Maximum 100 operations per transaction
- All operations succeed or all fail (atomic)
- Total payload must be under 4MB
Raises:
TableTransactionError: If any operation fails, entire transaction is rolled back
"""from azure.data.tables import TableClient, UpdateMode, TableTransactionError
table_client = TableClient.from_connection_string(conn_str, "orders")
# Basic batch operations
operations = [
# Create new entities
("create", {
"PartitionKey": "2023-Q4",
"RowKey": "order-001",
"CustomerName": "John Doe",
"Total": 299.99,
"Status": "pending"
}),
("create", {
"PartitionKey": "2023-Q4",
"RowKey": "order-002",
"CustomerName": "Jane Smith",
"Total": 149.99,
"Status": "pending"
}),
# Update existing entity
("update", {
"PartitionKey": "2023-Q4",
"RowKey": "order-003",
"Status": "shipped",
"ShippedDate": "2023-12-15"
}),
# Upsert entity (insert or update)
("upsert", {
"PartitionKey": "2023-Q4",
"RowKey": "order-004",
"CustomerName": "Bob Wilson",
"Total": 75.50,
"Status": "completed"
}),
# Delete entity
("delete", {
"PartitionKey": "2023-Q4",
"RowKey": "order-005"
})
]
try:
results = table_client.submit_transaction(operations)
print(f"Successfully processed {len(results)} operations")
for i, result in enumerate(results):
if result: # Delete operations return None
print(f"Operation {i}: ETag = {result.get('etag', 'N/A')}")
except TableTransactionError as e:
print(f"Transaction failed at operation {e.index}: {e.message}")
print(f"Error code: {e.error_code}")Use additional parameters for fine-grained control over batch operations.
# Extended operation format with options
TransactionOperationType = Union[
Tuple[str, Union[TableEntity, Mapping[str, Any]]],
Tuple[str, Union[TableEntity, Mapping[str, Any]], Mapping[str, Any]]
]from azure.data.tables import TableClient, UpdateMode
from azure.core import MatchConditions
table_client = TableClient.from_connection_string(conn_str, "inventory")
# Advanced batch with options
advanced_operations = [
# Create with no additional options
("create", {
"PartitionKey": "electronics",
"RowKey": "item-001",
"Name": "Laptop",
"Quantity": 10,
"Price": 999.99
}),
# Update with REPLACE mode
("update", {
"PartitionKey": "electronics",
"RowKey": "item-002",
"Name": "Updated Tablet",
"Quantity": 5,
"Price": 299.99
}, {
"mode": UpdateMode.REPLACE
}),
# Update with optimistic concurrency
("update", {
"PartitionKey": "electronics",
"RowKey": "item-003",
"Quantity": 8,
"etag": "W/\"datetime'2023-12-15T10%3A30%3A00.123Z'\""
}, {
"mode": UpdateMode.MERGE,
"match_condition": MatchConditions.IfNotModified
}),
# Upsert with REPLACE mode
("upsert", {
"PartitionKey": "electronics",
"RowKey": "item-004",
"Name": "Wireless Mouse",
"Quantity": 25,
"Price": 49.99
}, {
"mode": UpdateMode.REPLACE
})
]
try:
results = table_client.submit_transaction(advanced_operations)
print("Advanced batch transaction completed successfully")
except TableTransactionError as e:
print(f"Advanced transaction failed: {e.message}")Comprehensive enumeration of supported batch operation types.
class TransactionOperation(Enum):
"""
Batch transaction operation types.
Defines the available operations that can be performed
within a batch transaction.
"""
CREATE = "create" # Insert new entity (fails if exists)
UPDATE = "update" # Update existing entity (fails if not exists)
UPSERT = "upsert" # Insert or update entity
DELETE = "delete" # Delete entity (fails if not exists)from azure.data.tables import TableClient, TransactionOperation
table_client = TableClient.from_connection_string(conn_str, "products")
# Using enum values for type safety
operations = [
(TransactionOperation.CREATE, {
"PartitionKey": "category-a",
"RowKey": "prod-001",
"Name": "Product 1",
"Price": 19.99
}),
(TransactionOperation.UPSERT, {
"PartitionKey": "category-a",
"RowKey": "prod-002",
"Name": "Product 2",
"Price": 29.99
}),
(TransactionOperation.DELETE, {
"PartitionKey": "category-a",
"RowKey": "prod-003"
})
]
# Submit with enum-based operations
results = table_client.submit_transaction(operations)Control how entity updates are applied during batch operations.
class UpdateMode(Enum):
"""
Entity update modes for controlling merge behavior.
"""
REPLACE = "replace" # Replace entire entity with new properties
MERGE = "merge" # Merge new properties with existing entityfrom azure.data.tables import TableClient, UpdateMode
table_client = TableClient.from_connection_string(conn_str, "customers")
# Existing entity:
# {
# "PartitionKey": "vip",
# "RowKey": "customer-001",
# "Name": "John Doe",
# "Email": "john@example.com",
# "Phone": "555-1234",
# "VipLevel": "Gold"
# }
operations_merge = [
# MERGE: Only updates specified properties, keeps others
("update", {
"PartitionKey": "vip",
"RowKey": "customer-001",
"Phone": "555-9999", # Updated
"LastContact": "2023-12-15" # Added
# Name, Email, VipLevel remain unchanged
}, {"mode": UpdateMode.MERGE})
]
operations_replace = [
# REPLACE: Replaces entire entity, unspecified properties are removed
("update", {
"PartitionKey": "vip",
"RowKey": "customer-001",
"Name": "John Doe",
"Email": "newemail@example.com",
"VipLevel": "Platinum"
# Phone property will be removed since not specified
}, {"mode": UpdateMode.REPLACE})
]
# Execute merge operation
table_client.submit_transaction(operations_merge)
print("Merge update completed - existing properties preserved")
# Execute replace operation
table_client.submit_transaction(operations_replace)
print("Replace update completed - entity fully replaced")Common patterns for efficient batch processing of large datasets.
from azure.data.tables import TableClient
from typing import List, Dict, Any
def process_entities_in_batches(
table_client: TableClient,
entities: List[Dict[str, Any]],
operation_type: str = "create",
batch_size: int = 100
):
"""
Process large number of entities in optimally-sized batches.
Automatically groups entities by PartitionKey and processes
in batches respecting Azure Tables constraints.
"""
# Group entities by PartitionKey (required for batching)
partition_groups = {}
for entity in entities:
partition_key = entity["PartitionKey"]
if partition_key not in partition_groups:
partition_groups[partition_key] = []
partition_groups[partition_key].append(entity)
results = []
total_processed = 0
for partition_key, partition_entities in partition_groups.items():
print(f"Processing partition '{partition_key}': {len(partition_entities)} entities")
# Process partition in batches
for i in range(0, len(partition_entities), batch_size):
batch = partition_entities[i:i + batch_size]
operations = [(operation_type, entity) for entity in batch]
try:
batch_results = table_client.submit_transaction(operations)
results.extend(batch_results)
total_processed += len(batch)
print(f" Batch {i//batch_size + 1}: {len(batch)} entities processed")
except Exception as e:
print(f" Batch {i//batch_size + 1} failed: {e}")
# Handle individual entities or skip batch
continue
print(f"Total processed: {total_processed}/{len(entities)} entities")
return results
# Usage example
entities_to_create = [
{"PartitionKey": "2023-Q4", "RowKey": f"order-{i:04d}", "Amount": i * 10.0}
for i in range(1, 501) # 500 entities across potentially multiple partitions
]
results = process_entities_in_batches(
table_client,
entities_to_create,
operation_type="create"
)from azure.data.tables import TableClient
from datetime import datetime
def process_order_batch(table_client: TableClient, order_updates: Dict):
"""
Process complex order updates with mixed operations.
Example: New orders, status updates, cancellations in single transaction.
"""
partition_key = f"orders-{datetime.now().strftime('%Y-%m')}"
operations = []
# Add new orders
for order_id, order_data in order_updates.get("new_orders", {}).items():
operations.append(("create", {
"PartitionKey": partition_key,
"RowKey": order_id,
**order_data,
"CreatedAt": datetime.utcnow().isoformat(),
"Status": "pending"
}))
# Update existing orders
for order_id, updates in order_updates.get("order_updates", {}).items():
operations.append(("update", {
"PartitionKey": partition_key,
"RowKey": order_id,
**updates,
"ModifiedAt": datetime.utcnow().isoformat()
}))
# Cancel orders (soft delete by status update)
for order_id in order_updates.get("cancelled_orders", []):
operations.append(("update", {
"PartitionKey": partition_key,
"RowKey": order_id,
"Status": "cancelled",
"CancelledAt": datetime.utcnow().isoformat()
}))
# Hard delete orders
for order_id in order_updates.get("deleted_orders", []):
operations.append(("delete", {
"PartitionKey": partition_key,
"RowKey": order_id
}))
if not operations:
print("No operations to process")
return []
try:
results = table_client.submit_transaction(operations)
print(f"Successfully processed {len(operations)} order operations")
return results
except Exception as e:
print(f"Order batch processing failed: {e}")
raise
# Usage
order_updates = {
"new_orders": {
"ORD-001": {"CustomerName": "Alice", "Total": 199.99},
"ORD-002": {"CustomerName": "Bob", "Total": 299.99}
},
"order_updates": {
"ORD-003": {"Status": "shipped", "TrackingNumber": "TRK123"},
"ORD-004": {"Status": "delivered", "DeliveredAt": "2023-12-15T14:30:00"}
},
"cancelled_orders": ["ORD-005"],
"deleted_orders": ["ORD-006"]
}
process_order_batch(table_client, order_updates)Best practices for maximizing batch operation performance.
import time
from azure.data.tables import TableClient
def find_optimal_batch_size(table_client: TableClient, sample_entities: List[Dict]):
"""
Determine optimal batch size based on entity size and performance.
"""
test_sizes = [1, 10, 25, 50, 75, 100] # Azure Tables max is 100
performance_data = {}
for batch_size in test_sizes:
if len(sample_entities) < batch_size:
continue
# Test batch performance
test_entities = sample_entities[:batch_size]
operations = [("create", entity) for entity in test_entities]
start_time = time.time()
try:
table_client.submit_transaction(operations)
elapsed = time.time() - start_time
# Calculate throughput
throughput = batch_size / elapsed
performance_data[batch_size] = {
"elapsed": elapsed,
"throughput": throughput
}
print(f"Batch size {batch_size}: {elapsed:.3f}s, {throughput:.1f} entities/sec")
# Clean up test entities
cleanup_operations = [("delete", entity) for entity in test_entities]
table_client.submit_transaction(cleanup_operations)
except Exception as e:
print(f"Batch size {batch_size} failed: {e}")
performance_data[batch_size] = {"error": str(e)}
# Find optimal size
optimal_size = max(
(size for size, data in performance_data.items() if "throughput" in data),
key=lambda size: performance_data[size]["throughput"]
)
print(f"Optimal batch size: {optimal_size}")
return optimal_sizeimport asyncio
from concurrent.futures import ThreadPoolExecutor
from azure.data.tables import TableClient
from azure.data.tables.aio import TableClient as AsyncTableClient
async def parallel_batch_processing(
table_clients: List[AsyncTableClient],
entity_groups: List[List[Dict[str, Any]]]
):
"""
Process multiple batches in parallel using async clients.
Each batch must contain entities with the same PartitionKey.
"""
async def process_batch(client: AsyncTableClient, entities: List[Dict]):
operations = [("create", entity) for entity in entities]
return await client.submit_transaction(operations)
# Create tasks for parallel processing
tasks = [
process_batch(client, entities)
for client, entities in zip(table_clients, entity_groups)
]
# Execute all batches concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_batches = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Batch {i} failed: {result}")
else:
successful_batches += 1
print(f"Batch {i} completed: {len(result)} entities")
print(f"Parallel processing completed: {successful_batches}/{len(tasks)} batches successful")
return results
# Usage with thread pool for synchronous clients
def parallel_sync_batches(table_client: TableClient, entity_groups: List[List[Dict]]):
"""Process batches in parallel using thread pool."""
def process_sync_batch(entities):
operations = [("create", entity) for entity in entities]
return table_client.submit_transaction(operations)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(process_sync_batch, entities)
for entities in entity_groups
]
results = []
for i, future in enumerate(futures):
try:
result = future.result(timeout=30)
results.append(result)
print(f"Batch {i} completed successfully")
except Exception as e:
print(f"Batch {i} failed: {e}")
results.append(None)
return results
# Example usage
entity_groups = [
[{"PartitionKey": "group1", "RowKey": f"item{i}", "Value": i} for i in range(50)],
[{"PartitionKey": "group2", "RowKey": f"item{i}", "Value": i} for i in range(50)],
[{"PartitionKey": "group3", "RowKey": f"item{i}", "Value": i} for i in range(50)]
]
results = parallel_sync_batches(table_client, entity_groups)# Azure Tables batch transaction constraints:
#
# 1. Same PartitionKey: All entities in a batch must have the same PartitionKey
# 2. Maximum Operations: Up to 100 operations per batch
# 3. Payload Size: Total request payload must be under 4MB
# 4. Atomicity: All operations succeed or all fail
# 5. Operation Types: create, update, upsert, delete
# 6. No Query Operations: Batch transactions don't support query operations
# 7. No Cross-Partition: Cannot batch across different partitionsfrom azure.data.tables import TableClient
def validate_batch_constraints(operations):
"""Validate batch operations against Azure Tables constraints."""
if len(operations) > 100:
raise ValueError(f"Too many operations: {len(operations)} (max 100)")
if not operations:
raise ValueError("No operations provided")
# Check same partition key constraint
partition_keys = set()
for operation in operations:
operation_type, entity = operation[:2]
partition_key = entity.get("PartitionKey")
if not partition_key:
raise ValueError("All entities must have PartitionKey")
partition_keys.add(partition_key)
if len(partition_keys) > 1:
raise ValueError(f"Multiple partition keys not allowed: {partition_keys}")
# Estimate payload size (rough approximation)
estimated_size = sum(
len(str(operation[1])) for operation in operations
)
if estimated_size > 4 * 1024 * 1024: # 4MB
print(f"Warning: Estimated payload size {estimated_size} bytes may exceed 4MB limit")
print(f"Batch validation passed: {len(operations)} operations, partition '{list(partition_keys)[0]}'")
return True
# Usage
operations = [
("create", {"PartitionKey": "test", "RowKey": f"item{i}", "Data": f"value{i}"})
for i in range(50)
]
validate_batch_constraints(operations)
table_client.submit_transaction(operations)Important type aliases used throughout batch operations for type safety and flexibility.
# Entity representation types
EntityType = Union[TableEntity, Mapping[str, Any]]
"""Entity can be TableEntity or any mapping (dict-like) object with PartitionKey and RowKey"""
# Operation type specifications
OperationType = Union[TransactionOperation, str]
"""Operation type can be TransactionOperation enum value or string"""
# Complete transaction operation specification
TransactionOperationType = Union[
Tuple[OperationType, EntityType],
Tuple[OperationType, EntityType, Mapping[str, Any]]
]
"""
Transaction operation specification supporting:
- (operation_type, entity) for basic operations
- (operation_type, entity, options) for operations with additional parameters
"""Install with Tessl CLI
npx tessl i tessl/pypi-azure-data-tablesdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10