Microsoft Azure Data Tables Client Library for Python
90
Complete asynchronous operation support using async/await patterns for non-blocking I/O, providing identical functionality to synchronous clients with improved performance for concurrent scenarios.
Asynchronous version of TableServiceClient for account-level operations with non-blocking I/O.
from azure.data.tables.aio import TableServiceClient
class TableServiceClient:
def __init__(
self,
endpoint: str,
credential=None,
*,
audience: str = None,
api_version: str = None,
**kwargs
):
"""
Initialize async TableServiceClient.
Parameters identical to synchronous version.
All network operations are async and non-blocking.
"""
@classmethod
def from_connection_string(
cls,
conn_str: str,
**kwargs
) -> "TableServiceClient":
"""Create async client from connection string."""
async def create_table(self, table_name: str, **kwargs) -> "TableClient":
"""Create table asynchronously."""
async def create_table_if_not_exists(self, table_name: str, **kwargs) -> "TableClient":
"""Create table if not exists asynchronously."""
async def delete_table(self, table_name: str, **kwargs) -> None:
"""Delete table asynchronously."""
def list_tables(self, **kwargs) -> AsyncItemPaged[TableItem]:
"""List tables with async iteration."""
def query_tables(self, query_filter: str, **kwargs) -> AsyncItemPaged[TableItem]:
"""Query tables with async iteration."""
async def get_service_properties(self, **kwargs) -> Dict[str, object]:
"""Get service properties asynchronously."""
async def set_service_properties(self, **kwargs) -> None:
"""Set service properties asynchronously."""
async def get_service_stats(self, **kwargs) -> Dict[str, object]:
"""Get service statistics asynchronously."""
def get_table_client(self, table_name: str, **kwargs) -> "TableClient":
"""Get async TableClient for specific table."""
async def close(self) -> None:
"""Close the client and cleanup resources."""
async def __aenter__(self) -> "TableServiceClient":
"""Async context manager entry."""
async def __aexit__(self, *args) -> None:
"""Async context manager exit."""import asyncio
from azure.data.tables.aio import TableServiceClient
async def async_service_operations():
"""Demonstrate async service client operations."""
# Initialize async service client
async with TableServiceClient.from_connection_string(conn_str) as service_client:
# Create multiple tables concurrently
table_names = ["customers", "orders", "products", "inventory"]
# Create all tables in parallel
create_tasks = [
service_client.create_table_if_not_exists(name)
for name in table_names
]
table_clients = await asyncio.gather(*create_tasks)
print(f"Created {len(table_clients)} tables concurrently")
# List all tables asynchronously
print("Tables in account:")
async for table in service_client.list_tables():
print(f" - {table.name}")
# Query tables with filter
print("Tables starting with 'c':")
async for table in service_client.query_tables("TableName ge 'c'"):
print(f" - {table.name}")
# Get service properties
properties = await service_client.get_service_properties()
logging_enabled = properties.get('analytics_logging', {}).get('read', False)
print(f"Read logging enabled: {logging_enabled}")
# Run async operations
asyncio.run(async_service_operations())Asynchronous version of TableClient for table-specific operations with non-blocking entity operations.
from azure.data.tables.aio import TableClient
class TableClient:
def __init__(
self,
endpoint: str,
table_name: str,
*,
credential=None,
**kwargs
):
"""Initialize async TableClient."""
@classmethod
def from_connection_string(
cls,
conn_str: str,
table_name: str,
**kwargs
) -> "TableClient":
"""Create async client from connection string."""
@classmethod
def from_table_url(
cls,
table_url: str,
*,
credential=None,
**kwargs
) -> "TableClient":
"""Create async client from table URL."""
async def create_table(self, **kwargs) -> TableItem:
"""Create table asynchronously."""
async def delete_table(self, **kwargs) -> None:
"""Delete table asynchronously."""
async def create_entity(
self,
entity: Union[TableEntity, Mapping[str, Any]],
**kwargs
) -> Dict[str, Any]:
"""Create entity asynchronously."""
async def get_entity(
self,
partition_key: str,
row_key: str,
**kwargs
) -> TableEntity:
"""Get entity asynchronously."""
async def update_entity(
self,
entity: Union[TableEntity, Mapping[str, Any]],
**kwargs
) -> Dict[str, Any]:
"""Update entity asynchronously."""
async def upsert_entity(
self,
entity: Union[TableEntity, Mapping[str, Any]],
**kwargs
) -> Dict[str, Any]:
"""Upsert entity asynchronously."""
async def delete_entity(
self,
partition_key: str = None,
row_key: str = None,
**kwargs
) -> None:
"""Delete entity asynchronously."""
def list_entities(self, **kwargs) -> AsyncItemPaged[TableEntity]:
"""List entities with async iteration."""
def query_entities(self, query_filter: str, **kwargs) -> AsyncItemPaged[TableEntity]:
"""Query entities with async iteration."""
async def submit_transaction(
self,
operations: Iterable,
**kwargs
) -> List[Mapping[str, Any]]:
"""Submit batch transaction asynchronously."""
async def get_table_access_policy(self, **kwargs) -> Dict[str, Optional[TableAccessPolicy]]:
"""Get access policies asynchronously."""
async def set_table_access_policy(
self,
signed_identifiers: Mapping[str, Optional[TableAccessPolicy]],
**kwargs
) -> None:
"""Set access policies asynchronously."""
async def close(self) -> None:
"""Close client and cleanup resources."""
async def __aenter__(self) -> "TableClient":
"""Async context manager entry."""
async def __aexit__(self, *args) -> None:
"""Async context manager exit."""import asyncio
from azure.data.tables.aio import TableClient
from azure.data.tables import TableEntity
async def async_table_operations():
"""Demonstrate async table client operations."""
async with TableClient.from_connection_string(conn_str, "customers") as table_client:
# Create multiple entities concurrently
entities = [
TableEntity(
PartitionKey="vip",
RowKey=f"customer-{i:03d}",
Name=f"Customer {i}",
Email=f"customer{i}@example.com",
VipLevel="Gold"
)
for i in range(1, 11)
]
# Create all entities in parallel
create_tasks = [
table_client.create_entity(entity)
for entity in entities
]
results = await asyncio.gather(*create_tasks, return_exceptions=True)
successful_creates = sum(1 for r in results if not isinstance(r, Exception))
print(f"Created {successful_creates}/{len(entities)} entities concurrently")
# Query entities asynchronously
print("VIP customers:")
async for entity in table_client.query_entities("PartitionKey eq 'vip'"):
print(f" - {entity['Name']} ({entity['Email']})")
# Update multiple entities concurrently
update_tasks = []
async for entity in table_client.query_entities("PartitionKey eq 'vip'"):
entity["VipLevel"] = "Platinum"
entity["LastUpdated"] = "2023-12-15"
update_tasks.append(table_client.update_entity(entity))
if len(update_tasks) >= 5: # Process in batches
break
update_results = await asyncio.gather(*update_tasks)
print(f"Updated {len(update_results)} entities to Platinum level")
asyncio.run(async_table_operations())Handle large result sets efficiently using async iteration patterns.
class AsyncItemPaged:
"""
Async iterator for paged results from Azure Tables.
Supports async iteration over large result sets
with automatic paging and efficient memory usage.
"""
def __aiter__(self) -> AsyncIterator[T]:
"""Return async iterator."""
async def __anext__(self) -> T:
"""Get next item asynchronously."""
def by_page(self) -> AsyncIterator[AsyncIterator[T]]:
"""Iterate by pages for batch processing."""import asyncio
from azure.data.tables.aio import TableClient
async def async_iteration_patterns():
"""Demonstrate various async iteration patterns."""
async with TableClient.from_connection_string(conn_str, "large_dataset") as table_client:
# Basic async iteration
print("Processing all entities:")
entity_count = 0
async for entity in table_client.list_entities():
entity_count += 1
if entity_count % 1000 == 0:
print(f" Processed {entity_count} entities...")
print(f"Total entities processed: {entity_count}")
# Page-by-page processing for memory efficiency
print("Processing by pages:")
page_count = 0
total_entities = 0
async for page in table_client.list_entities().by_page(results_per_page=500):
page_count += 1
page_entities = []
async for entity in page:
page_entities.append(entity)
total_entities += len(page_entities)
print(f" Page {page_count}: {len(page_entities)} entities")
# Process page batch (e.g., bulk operations)
await process_entity_batch(page_entities)
print(f"Processed {total_entities} entities across {page_count} pages")
# Filtered async iteration with query
print("Processing filtered results:")
async for entity in table_client.query_entities(
"PartitionKey eq 'active' and Status eq 'pending'"
):
await process_pending_entity(entity)
async def process_entity_batch(entities):
"""Process a batch of entities asynchronously."""
# Simulate async processing
await asyncio.sleep(0.1)
print(f" Processed batch of {len(entities)} entities")
async def process_pending_entity(entity):
"""Process individual pending entity."""
# Simulate async processing
await asyncio.sleep(0.01)
asyncio.run(async_iteration_patterns())Leverage async capabilities for high-performance concurrent operations.
import asyncio
from azure.data.tables.aio import TableClient
from azure.data.tables import TableEntity
async def concurrent_entity_operations():
"""Demonstrate concurrent entity operations for high throughput."""
async with TableClient.from_connection_string(conn_str, "high_throughput") as table_client:
# Concurrent creates with rate limiting
semaphore = asyncio.Semaphore(10) # Limit concurrent operations
async def create_entity_with_limit(entity_data):
async with semaphore:
entity = TableEntity(**entity_data)
return await table_client.create_entity(entity)
# Generate entity data
entity_data_list = [
{
"PartitionKey": f"partition-{i // 100}",
"RowKey": f"entity-{i:06d}",
"Value": i,
"Category": f"cat-{i % 10}",
"Active": i % 2 == 0
}
for i in range(1000)
]
# Create entities concurrently with rate limiting
print("Creating entities with concurrency control...")
start_time = asyncio.get_event_loop().time()
create_tasks = [
create_entity_with_limit(data)
for data in entity_data_list
]
results = await asyncio.gather(*create_tasks, return_exceptions=True)
end_time = asyncio.get_event_loop().time()
successful_creates = sum(1 for r in results if not isinstance(r, Exception))
print(f"Created {successful_creates}/{len(entity_data_list)} entities")
print(f"Time: {end_time - start_time:.2f}s")
print(f"Throughput: {successful_creates / (end_time - start_time):.1f} entities/sec")
asyncio.run(concurrent_entity_operations())import asyncio
from azure.data.tables.aio import TableClient
from typing import List, Dict, Any
async def async_batch_processing():
"""Process multiple batches concurrently across partitions."""
async with TableClient.from_connection_string(conn_str, "batch_demo") as table_client:
# Prepare batch data for different partitions
partition_batches = {
"batch-1": [
{"PartitionKey": "batch-1", "RowKey": f"item-{i}", "Value": i}
for i in range(50)
],
"batch-2": [
{"PartitionKey": "batch-2", "RowKey": f"item-{i}", "Value": i}
for i in range(50)
],
"batch-3": [
{"PartitionKey": "batch-3", "RowKey": f"item-{i}", "Value": i}
for i in range(50)
]
}
async def process_partition_batch(partition_key: str, entities: List[Dict]):
"""Process a single partition batch."""
operations = [("create", entity) for entity in entities]
try:
result = await table_client.submit_transaction(operations)
print(f"Partition {partition_key}: {len(result)} entities created")
return len(result)
except Exception as e:
print(f"Partition {partition_key} failed: {e}")
return 0
# Process all partition batches concurrently
batch_tasks = [
process_partition_batch(partition_key, entities)
for partition_key, entities in partition_batches.items()
]
results = await asyncio.gather(*batch_tasks)
total_processed = sum(results)
print(f"Concurrent batch processing completed: {total_processed} total entities")
asyncio.run(async_batch_processing())Handle exceptions and errors properly in asynchronous operations.
import asyncio
from azure.data.tables.aio import TableClient
from azure.data.tables import TableTransactionError
from azure.core.exceptions import ResourceNotFoundError, ServiceRequestError
async def async_error_handling():
"""Demonstrate error handling patterns in async operations."""
async with TableClient.from_connection_string(conn_str, "error_demo") as table_client:
# Retry with exponential backoff (async version)
async def async_retry_with_backoff(coro_func, max_retries=3):
for attempt in range(max_retries):
try:
return await coro_func()
except ServiceRequestError as e:
if attempt == max_retries - 1:
raise
delay = 2 ** attempt # Exponential backoff
print(f"Attempt {attempt + 1} failed, retrying in {delay}s")
await asyncio.sleep(delay)
# Concurrent operations with individual error handling
async def safe_create_entity(entity_data):
try:
return await table_client.create_entity(entity_data)
except Exception as e:
print(f"Failed to create entity {entity_data.get('RowKey')}: {e}")
return None
# Process entities with error isolation
entities = [
{"PartitionKey": "safe", "RowKey": f"item-{i}", "Value": i}
for i in range(20)
]
# Add some problematic entities
entities.extend([
{"PartitionKey": "safe", "RowKey": "item-5", "Value": 999}, # Duplicate
{"PartitionKey": "safe"}, # Missing RowKey
])
# Create all entities concurrently with error handling
create_tasks = [safe_create_entity(entity) for entity in entities]
results = await asyncio.gather(*create_tasks)
successful_creates = [r for r in results if r is not None]
print(f"Successfully created {len(successful_creates)} entities")
# Batch operations with error recovery
async def robust_batch_operation(operations):
try:
return await table_client.submit_transaction(operations)
except TableTransactionError as e:
print(f"Batch failed at operation {e.index}: {e.message}")
# Execute operations individually as fallback
results = []
for i, (op_type, entity) in enumerate(operations):
if i == e.index:
results.append(None) # Skip failed operation
continue
try:
if op_type == "create":
result = await table_client.create_entity(entity)
elif op_type == "update":
result = await table_client.update_entity(entity)
# Add other operation types as needed
results.append(result)
except Exception as individual_error:
print(f"Individual operation {i} also failed: {individual_error}")
results.append(None)
return results
# Test batch with error recovery
batch_operations = [
("create", {"PartitionKey": "batch", "RowKey": f"item-{i}", "Value": i})
for i in range(10)
]
batch_results = await robust_batch_operation(batch_operations)
successful_batch = [r for r in batch_results if r is not None]
print(f"Batch processing: {len(successful_batch)} operations succeeded")
asyncio.run(async_error_handling())Proper resource management using async context managers.
import asyncio
from azure.data.tables.aio import TableServiceClient, TableClient
async def resource_management_patterns():
"""Demonstrate proper async resource management."""
# Pattern 1: Service client context manager
async with TableServiceClient.from_connection_string(conn_str) as service_client:
# Create table client from service client
table_client = service_client.get_table_client("managed_table")
# Use table client within service client context
async with table_client:
await table_client.create_table()
entity = {"PartitionKey": "test", "RowKey": "001", "Value": "data"}
await table_client.create_entity(entity)
async for entity in table_client.list_entities():
print(f"Entity: {entity['RowKey']}")
# Pattern 2: Multiple concurrent clients with proper cleanup
async def managed_concurrent_operations():
clients = []
try:
# Create multiple clients
for i in range(3):
client = TableClient.from_connection_string(conn_str, f"table_{i}")
clients.append(client)
# Use clients concurrently
create_tasks = [
client.create_table()
for client in clients
]
await asyncio.gather(*create_tasks, return_exceptions=True)
# Perform operations
operation_tasks = []
for i, client in enumerate(clients):
entity = {"PartitionKey": "concurrent", "RowKey": f"item-{i}", "Value": i}
operation_tasks.append(client.create_entity(entity))
await asyncio.gather(*operation_tasks)
finally:
# Ensure all clients are properly closed
close_tasks = [client.close() for client in clients]
await asyncio.gather(*close_tasks, return_exceptions=True)
await managed_concurrent_operations()
# Pattern 3: Long-lived client with proper lifecycle
class AsyncTableManager:
def __init__(self, connection_string: str, table_name: str):
self.connection_string = connection_string
self.table_name = table_name
self.client = None
async def start(self):
"""Initialize the async client."""
self.client = TableClient.from_connection_string(
self.connection_string,
self.table_name
)
await self.client.create_table()
async def stop(self):
"""Cleanup the async client."""
if self.client:
await self.client.close()
async def process_entities(self, entities):
"""Process entities with the managed client."""
if not self.client:
raise RuntimeError("Manager not started")
tasks = [
self.client.create_entity(entity)
for entity in entities
]
return await asyncio.gather(*tasks, return_exceptions=True)
# Use the managed client
manager = AsyncTableManager(conn_str, "managed_operations")
try:
await manager.start()
entities = [
{"PartitionKey": "managed", "RowKey": f"item-{i}", "Value": i}
for i in range(10)
]
results = await manager.process_entities(entities)
successful = sum(1 for r in results if not isinstance(r, Exception))
print(f"Managed processing: {successful}/{len(entities)} entities")
finally:
await manager.stop()
asyncio.run(resource_management_patterns())import asyncio
from azure.data.tables.aio import TableClient
from contextlib import AsyncExitStack
async def high_performance_pattern():
"""Optimized pattern for high-throughput async operations."""
# Configuration
MAX_CONCURRENT_OPERATIONS = 50
BATCH_SIZE = 100
TABLE_COUNT = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT_OPERATIONS)
async with AsyncExitStack() as stack:
# Create multiple table clients
clients = []
for i in range(TABLE_COUNT):
client = await stack.enter_async_context(
TableClient.from_connection_string(conn_str, f"perf_table_{i}")
)
clients.append(client)
# High-throughput entity creation
async def create_entity_throttled(client, entity):
async with semaphore:
return await client.create_entity(entity)
# Generate workload across multiple tables
all_tasks = []
for client_idx, client in enumerate(clients):
for batch_idx in range(10): # 10 batches per table
entities = [
{
"PartitionKey": f"perf-{batch_idx}",
"RowKey": f"item-{i:06d}",
"TableIndex": client_idx,
"BatchIndex": batch_idx,
"Value": i
}
for i in range(BATCH_SIZE)
]
# Add tasks for this batch
for entity in entities:
task = create_entity_throttled(client, entity)
all_tasks.append(task)
# Execute all operations concurrently
print(f"Executing {len(all_tasks)} operations across {TABLE_COUNT} tables...")
start_time = asyncio.get_event_loop().time()
results = await asyncio.gather(*all_tasks, return_exceptions=True)
end_time = asyncio.get_event_loop().time()
successful = sum(1 for r in results if not isinstance(r, Exception))
print(f"Completed: {successful}/{len(all_tasks)} operations")
print(f"Time: {end_time - start_time:.2f}s")
print(f"Throughput: {successful / (end_time - start_time):.1f} ops/sec")
# Run high-performance example
asyncio.run(high_performance_pattern())Install with Tessl CLI
npx tessl i tessl/pypi-azure-data-tablesdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10