0
# Async Operations
1
2
Asynchronous operations for high-performance, non-blocking Azure Cosmos DB applications. The async API provides coroutine-based versions of all synchronous operations with additional connection management capabilities.
3
4
## Capabilities
5
6
### Async Client Operations
7
8
Asynchronous version of CosmosClient with additional connection lifecycle management.
9
10
```python { .api }
11
class CosmosClient:
12
def __init__(self, url: str, credential: Union[str, Dict[str, Any], TokenCredential], consistency_level: str = None, **kwargs):
13
"""
14
Initialize async CosmosClient.
15
16
Parameters: Same as synchronous CosmosClient
17
"""
18
19
@classmethod
20
def from_connection_string(cls, conn_str: str, credential: str = None, consistency_level: str = None, **kwargs):
21
"""
22
Create async CosmosClient from connection string.
23
24
Parameters: Same as synchronous version
25
26
Returns:
27
Async CosmosClient instance
28
"""
29
30
async def create_database(self, id: str, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, **kwargs):
31
"""
32
Async version of create_database.
33
34
Parameters: Same as synchronous version
35
36
Returns:
37
Async DatabaseProxy for the created database
38
"""
39
40
async def create_database_if_not_exists(self, id: str, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, **kwargs):
41
"""
42
Async version of create_database_if_not_exists.
43
"""
44
45
def get_database_client(self, database: str):
46
"""
47
Get async database client (non-async method).
48
49
Returns:
50
Async DatabaseProxy instance
51
"""
52
53
async def list_databases(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
54
"""
55
Async version of list_databases.
56
"""
57
58
async def query_databases(self, query: str = None, parameters: list = None, **kwargs):
59
"""
60
Async version of query_databases.
61
"""
62
63
async def delete_database(self, database: str, populate_query_metrics: bool = None, **kwargs):
64
"""
65
Async version of delete_database.
66
"""
67
68
async def get_database_account(self, **kwargs):
69
"""
70
Async version of get_database_account.
71
"""
72
73
async def close(self):
74
"""
75
Close the client and clean up resources.
76
77
This method should be called when the client is no longer needed
78
to properly clean up connections and resources.
79
"""
80
```
81
82
### Async Database Operations
83
84
Asynchronous database-level operations.
85
86
```python { .api }
87
class DatabaseProxy:
88
async def read(self, populate_query_metrics: bool = None, **kwargs):
89
"""
90
Async version of read database properties.
91
"""
92
93
async def create_container(self, id: str, partition_key: PartitionKey, **kwargs):
94
"""
95
Async version of create_container.
96
97
Returns:
98
Async ContainerProxy for the created container
99
"""
100
101
async def create_container_if_not_exists(self, id: str, partition_key: PartitionKey, **kwargs):
102
"""
103
Async version of create_container_if_not_exists.
104
"""
105
106
def get_container_client(self, container: str):
107
"""
108
Get async container client (non-async method).
109
110
Returns:
111
Async ContainerProxy instance
112
"""
113
114
async def list_containers(self, max_item_count: int = None, **kwargs):
115
"""
116
Async version of list_containers.
117
"""
118
119
async def query_containers(self, query: str = None, parameters: list = None, **kwargs):
120
"""
121
Async version of query_containers.
122
"""
123
124
async def replace_container(self, container: str, partition_key: PartitionKey, **kwargs):
125
"""
126
Async version of replace_container.
127
"""
128
129
async def delete_container(self, container: str, **kwargs):
130
"""
131
Async version of delete_container.
132
"""
133
134
async def get_throughput(self, **kwargs):
135
"""
136
Async version of get_throughput.
137
"""
138
139
async def replace_throughput(self, throughput: ThroughputProperties, **kwargs):
140
"""
141
Async version of replace_throughput.
142
"""
143
```
144
145
### Async Container Operations
146
147
Asynchronous container-level operations for items and queries.
148
149
```python { .api }
150
class ContainerProxy:
151
async def read(self, populate_query_metrics: bool = None, **kwargs):
152
"""
153
Async version of read container properties.
154
"""
155
156
async def create_item(self, body: dict, **kwargs):
157
"""
158
Async version of create_item.
159
"""
160
161
async def read_item(self, item: str, partition_key: str, **kwargs):
162
"""
163
Async version of read_item.
164
"""
165
166
async def upsert_item(self, body: dict, **kwargs):
167
"""
168
Async version of upsert_item.
169
"""
170
171
async def replace_item(self, item: str, body: dict, **kwargs):
172
"""
173
Async version of replace_item.
174
"""
175
176
async def patch_item(self, item: str, partition_key: str, patch_operations: list, **kwargs):
177
"""
178
Async version of patch_item.
179
"""
180
181
async def delete_item(self, item: str, partition_key: str, **kwargs):
182
"""
183
Async version of delete_item.
184
"""
185
186
async def delete_all_items_by_partition_key(self, partition_key: str, **kwargs):
187
"""
188
Async version of delete_all_items_by_partition_key.
189
"""
190
191
async def query_items(self, query: str = None, parameters: list = None, **kwargs):
192
"""
193
Async version of query_items.
194
195
Returns:
196
Async iterable of query results
197
"""
198
199
async def read_all_items(self, max_item_count: int = None, **kwargs):
200
"""
201
Async version of read_all_items.
202
203
Returns:
204
Async iterable of all items
205
"""
206
207
async def query_items_change_feed(self, **kwargs):
208
"""
209
Async version of query_items_change_feed.
210
"""
211
212
async def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs):
213
"""
214
Async version of execute_item_batch.
215
"""
216
217
@property
218
def scripts(self):
219
"""
220
Get async scripts proxy (non-async property).
221
222
Returns:
223
Async ScriptsProxy instance
224
"""
225
```
226
227
### Async Script Operations
228
229
Asynchronous script operations for stored procedures, triggers, and UDFs.
230
231
```python { .api }
232
class ScriptsProxy:
233
async def create_stored_procedure(self, body: dict, **kwargs):
234
"""
235
Async version of create_stored_procedure.
236
"""
237
238
async def execute_stored_procedure(self, sproc: str, partition_key: str = None, params: list = None, **kwargs):
239
"""
240
Async version of execute_stored_procedure.
241
"""
242
243
async def list_stored_procedures(self, max_item_count: int = None, **kwargs):
244
"""
245
Async version of list_stored_procedures.
246
"""
247
248
async def get_stored_procedure(self, sproc: str, **kwargs):
249
"""
250
Async version of get_stored_procedure.
251
"""
252
253
async def replace_stored_procedure(self, sproc: str, body: dict, **kwargs):
254
"""
255
Async version of replace_stored_procedure.
256
"""
257
258
async def delete_stored_procedure(self, sproc: str, **kwargs):
259
"""
260
Async version of delete_stored_procedure.
261
"""
262
263
async def create_trigger(self, body: dict, **kwargs):
264
"""
265
Async version of create_trigger.
266
"""
267
268
async def create_user_defined_function(self, body: dict, **kwargs):
269
"""
270
Async version of create_user_defined_function.
271
"""
272
```
273
274
### Async User Management
275
276
Asynchronous user and permission operations.
277
278
```python { .api }
279
class UserProxy:
280
async def read(self, **kwargs):
281
"""
282
Async version of read user properties.
283
"""
284
285
async def create_permission(self, body: dict, **kwargs):
286
"""
287
Async version of create_permission.
288
"""
289
290
async def list_permissions(self, max_item_count: int = None, **kwargs):
291
"""
292
Async version of list_permissions.
293
"""
294
295
async def get_permission(self, permission: str, **kwargs):
296
"""
297
Async version of get_permission.
298
"""
299
300
async def replace_permission(self, permission: str, body: dict, **kwargs):
301
"""
302
Async version of replace_permission.
303
"""
304
305
async def delete_permission(self, permission: str, **kwargs):
306
"""
307
Async version of delete_permission.
308
"""
309
```
310
311
## Usage Examples
312
313
### Basic Async Operations
314
315
```python
316
import asyncio
317
from azure.cosmos.aio import CosmosClient
318
from azure.cosmos import ConsistencyLevel, PartitionKey
319
320
async def basic_async_operations():
321
# Initialize async client
322
async with CosmosClient(
323
url="https://myaccount.documents.azure.com:443/",
324
credential="myaccountkey==",
325
consistency_level=ConsistencyLevel.Session
326
) as client:
327
328
# Create database
329
database = await client.create_database_if_not_exists(
330
id="AsyncDatabase",
331
offer_throughput=400
332
)
333
334
# Create container
335
container = await database.create_container_if_not_exists(
336
id="AsyncContainer",
337
partition_key=PartitionKey(path="/category"),
338
offer_throughput=400
339
)
340
341
# Create items concurrently
342
items_to_create = [
343
{"id": f"item{i}", "category": "electronics", "name": f"Product {i}"}
344
for i in range(10)
345
]
346
347
# Create items concurrently
348
create_tasks = [
349
container.create_item(item) for item in items_to_create
350
]
351
created_items = await asyncio.gather(*create_tasks, return_exceptions=True)
352
353
successful_creates = [item for item in created_items if not isinstance(item, Exception)]
354
print(f"Successfully created {len(successful_creates)} items")
355
356
# Query items
357
query = "SELECT * FROM c WHERE c.category = @category"
358
parameters = [{"name": "@category", "value": "electronics"}]
359
360
items = []
361
async for item in container.query_items(
362
query=query,
363
parameters=parameters,
364
enable_cross_partition_query=True
365
):
366
items.append(item)
367
368
print(f"Queried {len(items)} items")
369
370
# Run the async function
371
asyncio.run(basic_async_operations())
372
```
373
374
### Context Manager Pattern
375
376
```python
377
async def context_manager_example():
378
"""Demonstrate proper resource management with async context manager."""
379
380
async with CosmosClient(
381
url="https://myaccount.documents.azure.com:443/",
382
credential="myaccountkey=="
383
) as client:
384
385
database = client.get_database_client("MyDatabase")
386
container = database.get_container_client("MyContainer")
387
388
# Perform operations
389
await container.create_item({
390
"id": "async_item",
391
"category": "test",
392
"data": "async operation"
393
})
394
395
item = await container.read_item(
396
item="async_item",
397
partition_key="test"
398
)
399
print(f"Read item: {item['id']}")
400
401
# Client is automatically closed when exiting context manager
402
print("Client resources cleaned up")
403
404
asyncio.run(context_manager_example())
405
```
406
407
### High-Performance Bulk Operations
408
409
```python
410
import asyncio
411
from concurrent.futures import ThreadPoolExecutor
412
import time
413
414
async def bulk_operations_example():
415
"""Demonstrate high-performance bulk operations with async."""
416
417
async with CosmosClient(
418
url="https://myaccount.documents.azure.com:443/",
419
credential="myaccountkey=="
420
) as client:
421
422
database = client.get_database_client("BulkDatabase")
423
container = database.get_container_client("BulkContainer")
424
425
# Generate large number of items
426
num_items = 1000
427
items = [
428
{
429
"id": f"bulk_item_{i}",
430
"category": f"category_{i % 10}",
431
"data": f"Data for item {i}",
432
"timestamp": time.time()
433
}
434
for i in range(num_items)
435
]
436
437
print(f"Creating {num_items} items...")
438
start_time = time.time()
439
440
# Process in batches to avoid overwhelming the service
441
batch_size = 50
442
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
443
444
for batch in batches:
445
# Create batch concurrently
446
tasks = [container.upsert_item(item) for item in batch]
447
results = await asyncio.gather(*tasks, return_exceptions=True)
448
449
# Handle any errors
450
errors = [r for r in results if isinstance(r, Exception)]
451
if errors:
452
print(f"Batch had {len(errors)} errors")
453
454
end_time = time.time()
455
print(f"Bulk operation completed in {end_time - start_time:.2f} seconds")
456
457
# Query performance test
458
print("Testing query performance...")
459
start_time = time.time()
460
461
query_tasks = []
462
for category_id in range(10):
463
query = "SELECT COUNT(1) as count FROM c WHERE c.category = @category"
464
parameters = [{"name": "@category", "value": f"category_{category_id}"}]
465
466
task = container.query_items(
467
query=query,
468
parameters=parameters,
469
enable_cross_partition_query=True
470
)
471
query_tasks.append(task)
472
473
# Execute queries concurrently
474
query_results = await asyncio.gather(*[
475
collect_async_iterable(query_task) for query_task in query_tasks
476
])
477
478
end_time = time.time()
479
print(f"Concurrent queries completed in {end_time - start_time:.2f} seconds")
480
481
for i, results in enumerate(query_results):
482
if results:
483
print(f"Category {i}: {results[0]['count']} items")
484
485
async def collect_async_iterable(async_iterable):
486
"""Helper to collect results from async iterable."""
487
results = []
488
async for item in async_iterable:
489
results.append(item)
490
return results
491
492
asyncio.run(bulk_operations_example())
493
```
494
495
### Change Feed Processing with Async
496
497
```python
498
async def change_feed_processor():
499
"""Process change feed asynchronously with high throughput."""
500
501
async with CosmosClient(
502
url="https://myaccount.documents.azure.com:443/",
503
credential="myaccountkey=="
504
) as client:
505
506
database = client.get_database_client("ChangeDatabase")
507
container = database.get_container_client("ChangeContainer")
508
509
# Get feed ranges for parallel processing
510
feed_ranges = await container.read_feed_ranges()
511
print(f"Processing {len(feed_ranges)} feed ranges")
512
513
async def process_feed_range(feed_range, range_id):
514
"""Process a single feed range."""
515
continuation = None
516
processed_count = 0
517
518
while True:
519
try:
520
changes = container.query_items_change_feed(
521
feed_range=feed_range,
522
continuation=continuation,
523
is_start_from_beginning=True,
524
max_item_count=100
525
)
526
527
batch_changes = []
528
async for change in changes:
529
if "_lsn" in change: # Valid change record
530
batch_changes.append(change)
531
532
if not batch_changes:
533
break
534
535
# Process changes (simulate work)
536
await asyncio.sleep(0.1) # Simulate processing time
537
processed_count += len(batch_changes)
538
539
print(f"Range {range_id}: Processed {len(batch_changes)} changes "
540
f"(total: {processed_count})")
541
542
# Get continuation for next batch
543
continuation = changes.get_continuation()
544
if not continuation:
545
break
546
547
except Exception as e:
548
print(f"Error processing range {range_id}: {e}")
549
break
550
551
return processed_count
552
553
# Process all feed ranges concurrently
554
tasks = [
555
process_feed_range(feed_range, i)
556
for i, feed_range in enumerate(feed_ranges)
557
]
558
559
results = await asyncio.gather(*tasks)
560
total_processed = sum(results)
561
print(f"Total changes processed: {total_processed}")
562
563
asyncio.run(change_feed_processor())
564
```
565
566
### Error Handling and Retry Logic
567
568
```python
569
import asyncio
570
import random
571
from azure.cosmos.exceptions import CosmosHttpResponseError
572
573
async def resilient_async_operations():
574
"""Demonstrate error handling and retry logic in async operations."""
575
576
async def retry_operation(operation, max_retries=3, delay=1.0):
577
"""Generic retry wrapper for async operations."""
578
for attempt in range(max_retries):
579
try:
580
return await operation()
581
except CosmosHttpResponseError as e:
582
if e.status_code == 429: # Rate limiting
583
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
584
print(f"Rate limited, waiting {wait_time:.2f}s (attempt {attempt + 1})")
585
await asyncio.sleep(wait_time)
586
else:
587
print(f"HTTP error {e.status_code}: {e.message}")
588
if attempt == max_retries - 1:
589
raise
590
except Exception as e:
591
print(f"Unexpected error: {e}")
592
if attempt == max_retries - 1:
593
raise
594
595
raise Exception(f"Operation failed after {max_retries} attempts")
596
597
async with CosmosClient(
598
url="https://myaccount.documents.azure.com:443/",
599
credential="myaccountkey=="
600
) as client:
601
602
database = client.get_database_client("ResilientDatabase")
603
container = database.get_container_client("ResilientContainer")
604
605
# Example: Resilient item creation
606
async def create_item_operation():
607
return await container.create_item({
608
"id": f"resilient_item_{random.randint(1, 1000)}",
609
"category": "test",
610
"timestamp": time.time()
611
})
612
613
# Use retry wrapper
614
try:
615
item = await retry_operation(create_item_operation)
616
print(f"Successfully created item: {item['id']}")
617
except Exception as e:
618
print(f"Failed to create item after retries: {e}")
619
620
# Parallel operations with individual error handling
621
async def safe_create_item(item_data):
622
"""Safely create an item with error handling."""
623
try:
624
return await retry_operation(
625
lambda: container.create_item(item_data)
626
)
627
except Exception as e:
628
print(f"Failed to create item {item_data['id']}: {e}")
629
return None
630
631
# Create multiple items with resilience
632
items_to_create = [
633
{"id": f"safe_item_{i}", "category": "batch", "data": f"Data {i}"}
634
for i in range(10)
635
]
636
637
tasks = [safe_create_item(item) for item in items_to_create]
638
results = await asyncio.gather(*tasks)
639
640
successful_items = [item for item in results if item is not None]
641
print(f"Successfully created {len(successful_items)} out of {len(items_to_create)} items")
642
643
asyncio.run(resilient_async_operations())
644
```