0
# Async Operations
1
2
Complete asynchronous operation support using async/await patterns for non-blocking I/O, providing identical functionality to synchronous clients with improved performance for concurrent scenarios.
3
4
## Capabilities
5
6
### Async Service Client
7
8
Asynchronous version of TableServiceClient for account-level operations with non-blocking I/O.
9
10
```python { .api }
11
from azure.data.tables.aio import TableServiceClient
12
13
class TableServiceClient:
14
def __init__(
15
self,
16
endpoint: str,
17
credential=None,
18
*,
19
audience: str = None,
20
api_version: str = None,
21
**kwargs
22
):
23
"""
24
Initialize async TableServiceClient.
25
26
Parameters identical to synchronous version.
27
All network operations are async and non-blocking.
28
"""
29
30
@classmethod
31
def from_connection_string(
32
cls,
33
conn_str: str,
34
**kwargs
35
) -> "TableServiceClient":
36
"""Create async client from connection string."""
37
38
async def create_table(self, table_name: str, **kwargs) -> "TableClient":
39
"""Create table asynchronously."""
40
41
async def create_table_if_not_exists(self, table_name: str, **kwargs) -> "TableClient":
42
"""Create table if not exists asynchronously."""
43
44
async def delete_table(self, table_name: str, **kwargs) -> None:
45
"""Delete table asynchronously."""
46
47
def list_tables(self, **kwargs) -> AsyncItemPaged[TableItem]:
48
"""List tables with async iteration."""
49
50
def query_tables(self, query_filter: str, **kwargs) -> AsyncItemPaged[TableItem]:
51
"""Query tables with async iteration."""
52
53
async def get_service_properties(self, **kwargs) -> Dict[str, object]:
54
"""Get service properties asynchronously."""
55
56
async def set_service_properties(self, **kwargs) -> None:
57
"""Set service properties asynchronously."""
58
59
async def get_service_stats(self, **kwargs) -> Dict[str, object]:
60
"""Get service statistics asynchronously."""
61
62
def get_table_client(self, table_name: str, **kwargs) -> "TableClient":
63
"""Get async TableClient for specific table."""
64
65
async def close(self) -> None:
66
"""Close the client and cleanup resources."""
67
68
async def __aenter__(self) -> "TableServiceClient":
69
"""Async context manager entry."""
70
71
async def __aexit__(self, *args) -> None:
72
"""Async context manager exit."""
73
```
74
75
#### Usage Example
76
77
```python
78
import asyncio
79
from azure.data.tables.aio import TableServiceClient
80
81
async def async_service_operations():
82
"""Demonstrate async service client operations."""
83
84
# Initialize async service client
85
async with TableServiceClient.from_connection_string(conn_str) as service_client:
86
87
# Create multiple tables concurrently
88
table_names = ["customers", "orders", "products", "inventory"]
89
90
# Create all tables in parallel
91
create_tasks = [
92
service_client.create_table_if_not_exists(name)
93
for name in table_names
94
]
95
96
table_clients = await asyncio.gather(*create_tasks)
97
print(f"Created {len(table_clients)} tables concurrently")
98
99
# List all tables asynchronously
100
print("Tables in account:")
101
async for table in service_client.list_tables():
102
print(f" - {table.name}")
103
104
# Query tables with filter
105
print("Tables starting with 'c':")
106
async for table in service_client.query_tables("TableName ge 'c'"):
107
print(f" - {table.name}")
108
109
# Get service properties
110
properties = await service_client.get_service_properties()
111
logging_enabled = properties.get('analytics_logging', {}).get('read', False)
112
print(f"Read logging enabled: {logging_enabled}")
113
114
# Run async operations
115
asyncio.run(async_service_operations())
116
```
117
118
### Async Table Client
119
120
Asynchronous version of TableClient for table-specific operations with non-blocking entity operations.
121
122
```python { .api }
123
from azure.data.tables.aio import TableClient
124
125
class TableClient:
126
def __init__(
127
self,
128
endpoint: str,
129
table_name: str,
130
*,
131
credential=None,
132
**kwargs
133
):
134
"""Initialize async TableClient."""
135
136
@classmethod
137
def from_connection_string(
138
cls,
139
conn_str: str,
140
table_name: str,
141
**kwargs
142
) -> "TableClient":
143
"""Create async client from connection string."""
144
145
@classmethod
146
def from_table_url(
147
cls,
148
table_url: str,
149
*,
150
credential=None,
151
**kwargs
152
) -> "TableClient":
153
"""Create async client from table URL."""
154
155
async def create_table(self, **kwargs) -> TableItem:
156
"""Create table asynchronously."""
157
158
async def delete_table(self, **kwargs) -> None:
159
"""Delete table asynchronously."""
160
161
async def create_entity(
162
self,
163
entity: Union[TableEntity, Mapping[str, Any]],
164
**kwargs
165
) -> Dict[str, Any]:
166
"""Create entity asynchronously."""
167
168
async def get_entity(
169
self,
170
partition_key: str,
171
row_key: str,
172
**kwargs
173
) -> TableEntity:
174
"""Get entity asynchronously."""
175
176
async def update_entity(
177
self,
178
entity: Union[TableEntity, Mapping[str, Any]],
179
**kwargs
180
) -> Dict[str, Any]:
181
"""Update entity asynchronously."""
182
183
async def upsert_entity(
184
self,
185
entity: Union[TableEntity, Mapping[str, Any]],
186
**kwargs
187
) -> Dict[str, Any]:
188
"""Upsert entity asynchronously."""
189
190
async def delete_entity(
191
self,
192
partition_key: str = None,
193
row_key: str = None,
194
**kwargs
195
) -> None:
196
"""Delete entity asynchronously."""
197
198
def list_entities(self, **kwargs) -> AsyncItemPaged[TableEntity]:
199
"""List entities with async iteration."""
200
201
def query_entities(self, query_filter: str, **kwargs) -> AsyncItemPaged[TableEntity]:
202
"""Query entities with async iteration."""
203
204
async def submit_transaction(
205
self,
206
operations: Iterable,
207
**kwargs
208
) -> List[Mapping[str, Any]]:
209
"""Submit batch transaction asynchronously."""
210
211
async def get_table_access_policy(self, **kwargs) -> Dict[str, Optional[TableAccessPolicy]]:
212
"""Get access policies asynchronously."""
213
214
async def set_table_access_policy(
215
self,
216
signed_identifiers: Mapping[str, Optional[TableAccessPolicy]],
217
**kwargs
218
) -> None:
219
"""Set access policies asynchronously."""
220
221
async def close(self) -> None:
222
"""Close client and cleanup resources."""
223
224
async def __aenter__(self) -> "TableClient":
225
"""Async context manager entry."""
226
227
async def __aexit__(self, *args) -> None:
228
"""Async context manager exit."""
229
```
230
231
#### Usage Example
232
233
```python
234
import asyncio
235
from azure.data.tables.aio import TableClient
236
from azure.data.tables import TableEntity
237
238
async def async_table_operations():
239
"""Demonstrate async table client operations."""
240
241
async with TableClient.from_connection_string(conn_str, "customers") as table_client:
242
243
# Create multiple entities concurrently
244
entities = [
245
TableEntity(
246
PartitionKey="vip",
247
RowKey=f"customer-{i:03d}",
248
Name=f"Customer {i}",
249
Email=f"customer{i}@example.com",
250
VipLevel="Gold"
251
)
252
for i in range(1, 11)
253
]
254
255
# Create all entities in parallel
256
create_tasks = [
257
table_client.create_entity(entity)
258
for entity in entities
259
]
260
261
results = await asyncio.gather(*create_tasks, return_exceptions=True)
262
successful_creates = sum(1 for r in results if not isinstance(r, Exception))
263
print(f"Created {successful_creates}/{len(entities)} entities concurrently")
264
265
# Query entities asynchronously
266
print("VIP customers:")
267
async for entity in table_client.query_entities("PartitionKey eq 'vip'"):
268
print(f" - {entity['Name']} ({entity['Email']})")
269
270
# Update multiple entities concurrently
271
update_tasks = []
272
async for entity in table_client.query_entities("PartitionKey eq 'vip'"):
273
entity["VipLevel"] = "Platinum"
274
entity["LastUpdated"] = "2023-12-15"
275
update_tasks.append(table_client.update_entity(entity))
276
277
if len(update_tasks) >= 5: # Process in batches
278
break
279
280
update_results = await asyncio.gather(*update_tasks)
281
print(f"Updated {len(update_results)} entities to Platinum level")
282
283
asyncio.run(async_table_operations())
284
```
285
286
### Async Iteration
287
288
Handle large result sets efficiently using async iteration patterns.
289
290
```python { .api }
291
class AsyncItemPaged:
292
"""
293
Async iterator for paged results from Azure Tables.
294
295
Supports async iteration over large result sets
296
with automatic paging and efficient memory usage.
297
"""
298
299
def __aiter__(self) -> AsyncIterator[T]:
300
"""Return async iterator."""
301
302
async def __anext__(self) -> T:
303
"""Get next item asynchronously."""
304
305
def by_page(self) -> AsyncIterator[AsyncIterator[T]]:
306
"""Iterate by pages for batch processing."""
307
```
308
309
#### Usage Example
310
311
```python
312
import asyncio
313
from azure.data.tables.aio import TableClient
314
315
async def async_iteration_patterns():
316
"""Demonstrate various async iteration patterns."""
317
318
async with TableClient.from_connection_string(conn_str, "large_dataset") as table_client:
319
320
# Basic async iteration
321
print("Processing all entities:")
322
entity_count = 0
323
async for entity in table_client.list_entities():
324
entity_count += 1
325
if entity_count % 1000 == 0:
326
print(f" Processed {entity_count} entities...")
327
328
print(f"Total entities processed: {entity_count}")
329
330
# Page-by-page processing for memory efficiency
331
print("Processing by pages:")
332
page_count = 0
333
total_entities = 0
334
335
async for page in table_client.list_entities().by_page(results_per_page=500):
336
page_count += 1
337
page_entities = []
338
339
async for entity in page:
340
page_entities.append(entity)
341
342
total_entities += len(page_entities)
343
print(f" Page {page_count}: {len(page_entities)} entities")
344
345
# Process page batch (e.g., bulk operations)
346
await process_entity_batch(page_entities)
347
348
print(f"Processed {total_entities} entities across {page_count} pages")
349
350
# Filtered async iteration with query
351
print("Processing filtered results:")
352
async for entity in table_client.query_entities(
353
"PartitionKey eq 'active' and Status eq 'pending'"
354
):
355
await process_pending_entity(entity)
356
357
async def process_entity_batch(entities):
358
"""Process a batch of entities asynchronously."""
359
# Simulate async processing
360
await asyncio.sleep(0.1)
361
print(f" Processed batch of {len(entities)} entities")
362
363
async def process_pending_entity(entity):
364
"""Process individual pending entity."""
365
# Simulate async processing
366
await asyncio.sleep(0.01)
367
368
asyncio.run(async_iteration_patterns())
369
```
370
371
### Concurrent Operations
372
373
Leverage async capabilities for high-performance concurrent operations.
374
375
#### Concurrent Entity Operations
376
377
```python
378
import asyncio
379
from azure.data.tables.aio import TableClient
380
from azure.data.tables import TableEntity
381
382
async def concurrent_entity_operations():
383
"""Demonstrate concurrent entity operations for high throughput."""
384
385
async with TableClient.from_connection_string(conn_str, "high_throughput") as table_client:
386
387
# Concurrent creates with rate limiting
388
semaphore = asyncio.Semaphore(10) # Limit concurrent operations
389
390
async def create_entity_with_limit(entity_data):
391
async with semaphore:
392
entity = TableEntity(**entity_data)
393
return await table_client.create_entity(entity)
394
395
# Generate entity data
396
entity_data_list = [
397
{
398
"PartitionKey": f"partition-{i // 100}",
399
"RowKey": f"entity-{i:06d}",
400
"Value": i,
401
"Category": f"cat-{i % 10}",
402
"Active": i % 2 == 0
403
}
404
for i in range(1000)
405
]
406
407
# Create entities concurrently with rate limiting
408
print("Creating entities with concurrency control...")
409
start_time = asyncio.get_event_loop().time()
410
411
create_tasks = [
412
create_entity_with_limit(data)
413
for data in entity_data_list
414
]
415
416
results = await asyncio.gather(*create_tasks, return_exceptions=True)
417
418
end_time = asyncio.get_event_loop().time()
419
successful_creates = sum(1 for r in results if not isinstance(r, Exception))
420
421
print(f"Created {successful_creates}/{len(entity_data_list)} entities")
422
print(f"Time: {end_time - start_time:.2f}s")
423
print(f"Throughput: {successful_creates / (end_time - start_time):.1f} entities/sec")
424
425
asyncio.run(concurrent_entity_operations())
426
```
427
428
#### Async Batch Processing
429
430
```python
431
import asyncio
432
from azure.data.tables.aio import TableClient
433
from typing import List, Dict, Any
434
435
async def async_batch_processing():
436
"""Process multiple batches concurrently across partitions."""
437
438
async with TableClient.from_connection_string(conn_str, "batch_demo") as table_client:
439
440
# Prepare batch data for different partitions
441
partition_batches = {
442
"batch-1": [
443
{"PartitionKey": "batch-1", "RowKey": f"item-{i}", "Value": i}
444
for i in range(50)
445
],
446
"batch-2": [
447
{"PartitionKey": "batch-2", "RowKey": f"item-{i}", "Value": i}
448
for i in range(50)
449
],
450
"batch-3": [
451
{"PartitionKey": "batch-3", "RowKey": f"item-{i}", "Value": i}
452
for i in range(50)
453
]
454
}
455
456
async def process_partition_batch(partition_key: str, entities: List[Dict]):
457
"""Process a single partition batch."""
458
operations = [("create", entity) for entity in entities]
459
460
try:
461
result = await table_client.submit_transaction(operations)
462
print(f"Partition {partition_key}: {len(result)} entities created")
463
return len(result)
464
except Exception as e:
465
print(f"Partition {partition_key} failed: {e}")
466
return 0
467
468
# Process all partition batches concurrently
469
batch_tasks = [
470
process_partition_batch(partition_key, entities)
471
for partition_key, entities in partition_batches.items()
472
]
473
474
results = await asyncio.gather(*batch_tasks)
475
total_processed = sum(results)
476
477
print(f"Concurrent batch processing completed: {total_processed} total entities")
478
479
asyncio.run(async_batch_processing())
480
```
481
482
### Error Handling in Async Context
483
484
Handle exceptions and errors properly in asynchronous operations.
485
486
#### Async Error Handling Patterns
487
488
```python
489
import asyncio
490
from azure.data.tables.aio import TableClient
491
from azure.data.tables import TableTransactionError
492
from azure.core.exceptions import ResourceNotFoundError, ServiceRequestError
493
494
async def async_error_handling():
495
"""Demonstrate error handling patterns in async operations."""
496
497
async with TableClient.from_connection_string(conn_str, "error_demo") as table_client:
498
499
# Retry with exponential backoff (async version)
500
async def async_retry_with_backoff(coro_func, max_retries=3):
501
for attempt in range(max_retries):
502
try:
503
return await coro_func()
504
except ServiceRequestError as e:
505
if attempt == max_retries - 1:
506
raise
507
508
delay = 2 ** attempt # Exponential backoff
509
print(f"Attempt {attempt + 1} failed, retrying in {delay}s")
510
await asyncio.sleep(delay)
511
512
# Concurrent operations with individual error handling
513
async def safe_create_entity(entity_data):
514
try:
515
return await table_client.create_entity(entity_data)
516
except Exception as e:
517
print(f"Failed to create entity {entity_data.get('RowKey')}: {e}")
518
return None
519
520
# Process entities with error isolation
521
entities = [
522
{"PartitionKey": "safe", "RowKey": f"item-{i}", "Value": i}
523
for i in range(20)
524
]
525
526
# Add some problematic entities
527
entities.extend([
528
{"PartitionKey": "safe", "RowKey": "item-5", "Value": 999}, # Duplicate
529
{"PartitionKey": "safe"}, # Missing RowKey
530
])
531
532
# Create all entities concurrently with error handling
533
create_tasks = [safe_create_entity(entity) for entity in entities]
534
results = await asyncio.gather(*create_tasks)
535
536
successful_creates = [r for r in results if r is not None]
537
print(f"Successfully created {len(successful_creates)} entities")
538
539
# Batch operations with error recovery
540
async def robust_batch_operation(operations):
541
try:
542
return await table_client.submit_transaction(operations)
543
except TableTransactionError as e:
544
print(f"Batch failed at operation {e.index}: {e.message}")
545
546
# Execute operations individually as fallback
547
results = []
548
for i, (op_type, entity) in enumerate(operations):
549
if i == e.index:
550
results.append(None) # Skip failed operation
551
continue
552
553
try:
554
if op_type == "create":
555
result = await table_client.create_entity(entity)
556
elif op_type == "update":
557
result = await table_client.update_entity(entity)
558
# Add other operation types as needed
559
560
results.append(result)
561
except Exception as individual_error:
562
print(f"Individual operation {i} also failed: {individual_error}")
563
results.append(None)
564
565
return results
566
567
# Test batch with error recovery
568
batch_operations = [
569
("create", {"PartitionKey": "batch", "RowKey": f"item-{i}", "Value": i})
570
for i in range(10)
571
]
572
573
batch_results = await robust_batch_operation(batch_operations)
574
successful_batch = [r for r in batch_results if r is not None]
575
print(f"Batch processing: {len(successful_batch)} operations succeeded")
576
577
asyncio.run(async_error_handling())
578
```
579
580
### Context Management
581
582
Proper resource management using async context managers.
583
584
#### Resource Management Patterns
585
586
```python
587
import asyncio
588
from azure.data.tables.aio import TableServiceClient, TableClient
589
590
async def resource_management_patterns():
591
"""Demonstrate proper async resource management."""
592
593
# Pattern 1: Service client context manager
594
async with TableServiceClient.from_connection_string(conn_str) as service_client:
595
596
# Create table client from service client
597
table_client = service_client.get_table_client("managed_table")
598
599
# Use table client within service client context
600
async with table_client:
601
await table_client.create_table()
602
603
entity = {"PartitionKey": "test", "RowKey": "001", "Value": "data"}
604
await table_client.create_entity(entity)
605
606
async for entity in table_client.list_entities():
607
print(f"Entity: {entity['RowKey']}")
608
609
# Pattern 2: Multiple concurrent clients with proper cleanup
610
async def managed_concurrent_operations():
611
clients = []
612
try:
613
# Create multiple clients
614
for i in range(3):
615
client = TableClient.from_connection_string(conn_str, f"table_{i}")
616
clients.append(client)
617
618
# Use clients concurrently
619
create_tasks = [
620
client.create_table()
621
for client in clients
622
]
623
624
await asyncio.gather(*create_tasks, return_exceptions=True)
625
626
# Perform operations
627
operation_tasks = []
628
for i, client in enumerate(clients):
629
entity = {"PartitionKey": "concurrent", "RowKey": f"item-{i}", "Value": i}
630
operation_tasks.append(client.create_entity(entity))
631
632
await asyncio.gather(*operation_tasks)
633
634
finally:
635
# Ensure all clients are properly closed
636
close_tasks = [client.close() for client in clients]
637
await asyncio.gather(*close_tasks, return_exceptions=True)
638
639
await managed_concurrent_operations()
640
641
# Pattern 3: Long-lived client with proper lifecycle
642
class AsyncTableManager:
643
def __init__(self, connection_string: str, table_name: str):
644
self.connection_string = connection_string
645
self.table_name = table_name
646
self.client = None
647
648
async def start(self):
649
"""Initialize the async client."""
650
self.client = TableClient.from_connection_string(
651
self.connection_string,
652
self.table_name
653
)
654
await self.client.create_table()
655
656
async def stop(self):
657
"""Cleanup the async client."""
658
if self.client:
659
await self.client.close()
660
661
async def process_entities(self, entities):
662
"""Process entities with the managed client."""
663
if not self.client:
664
raise RuntimeError("Manager not started")
665
666
tasks = [
667
self.client.create_entity(entity)
668
for entity in entities
669
]
670
671
return await asyncio.gather(*tasks, return_exceptions=True)
672
673
# Use the managed client
674
manager = AsyncTableManager(conn_str, "managed_operations")
675
try:
676
await manager.start()
677
678
entities = [
679
{"PartitionKey": "managed", "RowKey": f"item-{i}", "Value": i}
680
for i in range(10)
681
]
682
683
results = await manager.process_entities(entities)
684
successful = sum(1 for r in results if not isinstance(r, Exception))
685
print(f"Managed processing: {successful}/{len(entities)} entities")
686
687
finally:
688
await manager.stop()
689
690
asyncio.run(resource_management_patterns())
691
```
692
693
## Performance Considerations
694
695
### Async Best Practices
696
697
1. **Connection Pooling**: Async clients automatically manage connection pools
698
2. **Concurrency Limits**: Use semaphores to control concurrent operations
699
3. **Resource Management**: Always use context managers or explicit cleanup
700
4. **Error Isolation**: Handle exceptions in concurrent operations individually
701
5. **Batch Optimization**: Process multiple partitions concurrently
702
6. **Memory Management**: Use async iteration for large result sets
703
704
### Example High-Performance Pattern
705
706
```python
707
import asyncio
708
from azure.data.tables.aio import TableClient
709
from contextlib import AsyncExitStack
710
711
async def high_performance_pattern():
712
"""Optimized pattern for high-throughput async operations."""
713
714
# Configuration
715
MAX_CONCURRENT_OPERATIONS = 50
716
BATCH_SIZE = 100
717
TABLE_COUNT = 5
718
719
semaphore = asyncio.Semaphore(MAX_CONCURRENT_OPERATIONS)
720
721
async with AsyncExitStack() as stack:
722
# Create multiple table clients
723
clients = []
724
for i in range(TABLE_COUNT):
725
client = await stack.enter_async_context(
726
TableClient.from_connection_string(conn_str, f"perf_table_{i}")
727
)
728
clients.append(client)
729
730
# High-throughput entity creation
731
async def create_entity_throttled(client, entity):
732
async with semaphore:
733
return await client.create_entity(entity)
734
735
# Generate workload across multiple tables
736
all_tasks = []
737
for client_idx, client in enumerate(clients):
738
for batch_idx in range(10): # 10 batches per table
739
entities = [
740
{
741
"PartitionKey": f"perf-{batch_idx}",
742
"RowKey": f"item-{i:06d}",
743
"TableIndex": client_idx,
744
"BatchIndex": batch_idx,
745
"Value": i
746
}
747
for i in range(BATCH_SIZE)
748
]
749
750
# Add tasks for this batch
751
for entity in entities:
752
task = create_entity_throttled(client, entity)
753
all_tasks.append(task)
754
755
# Execute all operations concurrently
756
print(f"Executing {len(all_tasks)} operations across {TABLE_COUNT} tables...")
757
start_time = asyncio.get_event_loop().time()
758
759
results = await asyncio.gather(*all_tasks, return_exceptions=True)
760
761
end_time = asyncio.get_event_loop().time()
762
successful = sum(1 for r in results if not isinstance(r, Exception))
763
764
print(f"Completed: {successful}/{len(all_tasks)} operations")
765
print(f"Time: {end_time - start_time:.2f}s")
766
print(f"Throughput: {successful / (end_time - start_time):.1f} ops/sec")
767
768
# Run high-performance example
769
asyncio.run(high_performance_pattern())
770
```