docs
0
# Async Programming Patterns
1
2
Azure Core provides comprehensive async/await support with full feature parity to synchronous operations. The async infrastructure includes async versions of all core classes, proper resource management, context manager patterns, and performance optimizations for asynchronous operations.
3
4
## Core Async Components
5
6
### AsyncPipelineClient
7
8
Main async client for Azure services with context manager support and flexible response handling.
9
10
```python { .api }
11
from azure.core import AsyncPipelineClient
12
from azure.core.credentials import AsyncTokenCredential
13
from azure.core.rest import HttpRequest
14
from typing import AsyncContextManager, Optional, Any
15
16
class AsyncPipelineClient(AsyncContextManager["AsyncPipelineClient"]):
17
def __init__(
18
self,
19
base_url: str,
20
*,
21
pipeline: Optional[AsyncPipeline] = None,
22
config: Optional[Configuration] = None,
23
**kwargs: Any,
24
): ...
25
26
def send_request(
27
self,
28
request: HttpRequest,
29
*,
30
stream: bool = False,
31
**kwargs: Any
32
) -> Awaitable[AsyncHttpResponse]: ...
33
34
async def __aenter__(self) -> "AsyncPipelineClient": ...
35
async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...
36
async def close(self) -> None: ...
37
```
38
39
### AsyncPipeline
40
41
Asynchronous HTTP pipeline with context manager support and async policy execution.
42
43
```python { .api }
44
from azure.core.pipeline import AsyncPipeline, AsyncHTTPPolicy
45
from azure.core.pipeline.transport import AsyncHttpTransport
46
from typing import AsyncContextManager, Iterable, Union, Optional
47
48
class AsyncPipeline(AsyncContextManager["AsyncPipeline"]):
49
def __init__(
50
self,
51
transport: AsyncHttpTransport,
52
policies: Optional[Iterable[Union[AsyncHTTPPolicy, SansIOHTTPPolicy]]] = None,
53
): ...
54
55
async def run(self, request: HttpRequest, **kwargs: Any) -> PipelineResponse: ...
56
57
async def __aenter__(self) -> "AsyncPipeline": ...
58
async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...
59
```
60
61
## Basic Async Usage
62
63
### Client Creation and Usage
64
65
```python
66
from azure.core import AsyncPipelineClient
67
from azure.core.credentials import AsyncTokenCredential
68
from azure.core.rest import HttpRequest
69
import asyncio
70
71
async def basic_async_client():
72
# Create async client with context manager (recommended)
73
async with AsyncPipelineClient("https://api.example.com") as client:
74
request = HttpRequest("GET", "/api/data")
75
response = await client.send_request(request)
76
77
# Handle response
78
response.raise_for_status()
79
data = response.json()
80
return data
81
82
# Run async function
83
result = asyncio.run(basic_async_client())
84
```
85
86
### Manual Resource Management
87
88
```python
89
async def manual_resource_management():
90
client = AsyncPipelineClient("https://api.example.com")
91
92
try:
93
# Manual opening not required, but available
94
await client.__aenter__()
95
96
request = HttpRequest("GET", "/api/data")
97
response = await client.send_request(request)
98
99
return response.json()
100
finally:
101
# Always close resources
102
await client.close()
103
```
104
105
### Dual Usage Response Pattern
106
107
Azure Core's unique dual-usage pattern allows responses to be used both as awaitable and context manager:
108
109
```python
110
async def dual_usage_patterns():
111
async with AsyncPipelineClient("https://api.example.com") as client:
112
request = HttpRequest("GET", "/api/data")
113
114
# Pattern 1: Direct await
115
response = await client.send_request(request)
116
data = response.json()
117
response.close() # Manual cleanup
118
119
# Pattern 2: Context manager (automatic cleanup)
120
async with client.send_request(request) as response:
121
response.raise_for_status()
122
data = response.json()
123
# Response automatically closed when exiting context
124
125
return data
126
```
127
128
## Async Authentication
129
130
### AsyncTokenCredential
131
132
Protocol for async token-based authentication with context manager support.
133
134
```python { .api }
135
from azure.core.credentials import AsyncTokenCredential, AccessToken
136
from typing import AsyncContextManager, Optional
137
138
class AsyncTokenCredential(AsyncContextManager["AsyncTokenCredential"]):
139
async def get_token(
140
self,
141
*scopes: str,
142
claims: Optional[str] = None,
143
tenant_id: Optional[str] = None,
144
enable_cae: bool = False,
145
**kwargs: Any,
146
) -> AccessToken: ...
147
148
async def close(self) -> None: ...
149
async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...
150
```
151
152
### Usage with Authentication
153
154
```python
155
from azure.identity.aio import DefaultAzureCredential
156
from azure.core import AsyncPipelineClient
157
158
async def authenticated_client():
159
# Create async credential
160
credential = DefaultAzureCredential()
161
162
try:
163
# Create client with authentication
164
async with AsyncPipelineClient(
165
"https://api.example.com",
166
credential=credential
167
) as client:
168
request = HttpRequest("GET", "/api/protected-resource")
169
170
async with client.send_request(request) as response:
171
response.raise_for_status()
172
return response.json()
173
finally:
174
# Close credential resources
175
await credential.close()
176
```
177
178
## Async Pipeline Policies
179
180
### AsyncBearerTokenCredentialPolicy
181
182
Async authentication policy with automatic token refresh and challenge handling.
183
184
```python { .api }
185
from azure.core.pipeline.policies import AsyncBearerTokenCredentialPolicy
186
from azure.core.credentials import AsyncTokenCredential
187
188
class AsyncBearerTokenCredentialPolicy(AsyncHTTPPolicy):
189
def __init__(
190
self,
191
credential: AsyncTokenCredential,
192
*scopes: str,
193
**kwargs: Any
194
): ...
195
196
async def on_request(self, request: PipelineRequest) -> None: ...
197
async def on_challenge(self, request: PipelineRequest, response: PipelineResponse) -> bool: ...
198
async def send(self, request: PipelineRequest) -> PipelineResponse: ...
199
```
200
201
### AsyncRetryPolicy
202
203
Async retry policy with configurable backoff and sleep patterns.
204
205
```python { .api }
206
from azure.core.pipeline.policies import AsyncRetryPolicy
207
208
class AsyncRetryPolicy(AsyncHTTPPolicy):
209
def __init__(
210
self,
211
*,
212
retry_total: int = 10,
213
retry_connect: int = 3,
214
retry_read: int = 3,
215
retry_status: int = 3,
216
retry_backoff_factor: float = 0.8,
217
retry_backoff_max: int = 120,
218
**kwargs: Any
219
): ...
220
221
async def sleep(self, settings: Dict[str, Any], transport: AsyncHttpTransport) -> None: ...
222
async def send(self, request: PipelineRequest) -> PipelineResponse: ...
223
```
224
225
## Async Transport
226
227
### AioHttpTransport
228
229
HTTP transport implementation using aiohttp with full async support.
230
231
```python
232
from azure.core.pipeline.transport import AioHttpTransport
233
import aiohttp
234
235
async def custom_transport_example():
236
# Custom aiohttp session configuration
237
timeout = aiohttp.ClientTimeout(total=30, connect=10)
238
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
239
240
session = aiohttp.ClientSession(
241
timeout=timeout,
242
connector=connector
243
)
244
245
# Create transport with custom session
246
transport = AioHttpTransport(session=session, session_owner=False)
247
248
try:
249
async with AsyncPipelineClient(
250
"https://api.example.com",
251
transport=transport
252
) as client:
253
request = HttpRequest("GET", "/api/data")
254
response = await client.send_request(request)
255
return response.json()
256
finally:
257
await session.close()
258
```
259
260
## Async Streaming
261
262
### Streaming Responses
263
264
```python
265
async def streaming_download():
266
async with AsyncPipelineClient("https://api.example.com") as client:
267
request = HttpRequest("GET", "/api/large-file")
268
269
# Enable streaming
270
async with client.send_request(request, stream=True) as response:
271
response.raise_for_status()
272
273
# Stream download with chunks
274
total_size = 0
275
async for chunk in response.iter_bytes(chunk_size=8192):
276
total_size += len(chunk)
277
process_chunk(chunk)
278
279
# Progress reporting
280
if total_size % (1024 * 1024) == 0: # Every MB
281
print(f"Downloaded {total_size // (1024 * 1024)} MB")
282
283
print(f"Download complete: {total_size} bytes")
284
```
285
286
### Multipart Responses
287
288
```python
289
async def handle_multipart_response():
290
async with AsyncPipelineClient("https://api.example.com") as client:
291
request = HttpRequest("GET", "/api/multipart-data")
292
293
async with client.send_request(request) as response:
294
# Iterate over multipart sections
295
async for part in response.parts():
296
content_type = part.headers.get("content-type")
297
content = await part.read()
298
299
print(f"Part: {content_type}, Size: {len(content)}")
300
process_part(content_type, content)
301
```
302
303
## Async Pagination
304
305
### ItemPaged Async Usage
306
307
```python
308
from azure.core.async_paging import AsyncItemPaged
309
310
async def paginated_data_processing():
311
# Assume client.list_items() returns AsyncItemPaged
312
async_pager = client.list_items()
313
314
# Item-by-item iteration (recommended for most cases)
315
processed_count = 0
316
async for item in async_pager:
317
await process_item(item)
318
processed_count += 1
319
320
# Progress reporting
321
if processed_count % 100 == 0:
322
print(f"Processed {processed_count} items")
323
324
return processed_count
325
326
async def paginated_batch_processing():
327
async_pager = client.list_items()
328
329
# Page-by-page iteration for batch processing
330
page_count = 0
331
async for page in async_pager.by_page():
332
page_items = []
333
async for item in page:
334
page_items.append(item)
335
336
# Process entire page as batch
337
await process_batch(page_items)
338
page_count += 1
339
print(f"Processed page {page_count} with {len(page_items)} items")
340
```
341
342
## Async Long-Running Operations
343
344
### AsyncLROPoller
345
346
```python
347
from azure.core.polling import AsyncLROPoller
348
349
async def long_running_operation():
350
async with AsyncPipelineClient("https://api.example.com") as client:
351
# Start long-running operation
352
request = HttpRequest("POST", "/api/start-operation", json={"data": "value"})
353
354
# Begin polling operation
355
poller: AsyncLROPoller = await client.begin_long_running_operation(request)
356
357
# Wait for completion with custom timeout
358
try:
359
result = await poller.result(timeout=300) # 5 minutes
360
print(f"Operation completed: {result}")
361
return result
362
except Exception as e:
363
print(f"Operation failed or timed out: {e}")
364
raise
365
366
async def polling_with_progress():
367
poller = await client.begin_operation(request)
368
369
# Manual polling with progress updates
370
while not poller.done():
371
print("Operation in progress...")
372
await asyncio.sleep(5) # Check every 5 seconds
373
374
result = await poller.result()
375
return result
376
```
377
378
## Async Error Handling
379
380
### Exception Handling Patterns
381
382
```python
383
from azure.core.exceptions import HttpResponseError, ServiceRequestError
384
import asyncio
385
386
async def robust_error_handling():
387
async with AsyncPipelineClient("https://api.example.com") as client:
388
try:
389
request = HttpRequest("GET", "/api/data")
390
391
async with client.send_request(request) as response:
392
response.raise_for_status()
393
return response.json()
394
395
except HttpResponseError as e:
396
if e.status_code == 429: # Rate limited
397
retry_after = int(e.response.headers.get("Retry-After", "60"))
398
print(f"Rate limited, waiting {retry_after} seconds")
399
await asyncio.sleep(retry_after)
400
# Implement retry logic
401
return await retry_request(client, request)
402
else:
403
print(f"HTTP error {e.status_code}: {e.message}")
404
raise
405
except ServiceRequestError as e:
406
print(f"Service request error: {e}")
407
raise
408
except asyncio.TimeoutError:
409
print("Request timed out")
410
raise
411
```
412
413
## Async Tracing
414
415
### Distributed Tracing with Async
416
417
```python
418
from azure.core.tracing.decorator_async import distributed_trace_async
419
from azure.core.tracing import SpanKind
420
421
@distributed_trace_async(name_of_span="async_data_fetch", kind=SpanKind.CLIENT)
422
async def fetch_data_with_tracing(client: AsyncPipelineClient, resource_id: str):
423
"""Automatically traced async function"""
424
request = HttpRequest("GET", f"/api/resources/{resource_id}")
425
426
async with client.send_request(request) as response:
427
response.raise_for_status()
428
return response.json()
429
430
async def manual_async_tracing():
431
from azure.core.tracing import AbstractSpan, SpanKind
432
433
async with AbstractSpan(name="async_operation", kind=SpanKind.CLIENT) as span:
434
span.add_attribute("operation.type", "data_processing")
435
436
try:
437
result = await process_async_data()
438
span.add_attribute("operation.success", True)
439
span.add_attribute("result.count", len(result))
440
return result
441
except Exception as e:
442
span.add_attribute("error.type", type(e).__name__)
443
span.add_attribute("error.message", str(e))
444
raise
445
```
446
447
## Advanced Async Patterns
448
449
### Concurrent Operations
450
451
```python
452
import asyncio
453
from typing import List
454
455
async def concurrent_requests():
456
async with AsyncPipelineClient("https://api.example.com") as client:
457
# Create multiple requests
458
requests = [
459
HttpRequest("GET", f"/api/resource/{i}")
460
for i in range(1, 11)
461
]
462
463
# Execute requests concurrently
464
async def fetch_single(request):
465
async with client.send_request(request) as response:
466
response.raise_for_status()
467
return response.json()
468
469
# Use asyncio.gather for concurrent execution
470
results = await asyncio.gather(
471
*[fetch_single(req) for req in requests],
472
return_exceptions=True
473
)
474
475
# Process results
476
successful_results = []
477
for i, result in enumerate(results):
478
if isinstance(result, Exception):
479
print(f"Request {i+1} failed: {result}")
480
else:
481
successful_results.append(result)
482
483
return successful_results
484
485
async def semaphore_controlled_requests():
486
# Limit concurrent requests to prevent overwhelming the service
487
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests
488
489
async def controlled_fetch(client, request):
490
async with semaphore:
491
async with client.send_request(request) as response:
492
response.raise_for_status()
493
return response.json()
494
495
async with AsyncPipelineClient("https://api.example.com") as client:
496
tasks = [controlled_fetch(client, req) for req in requests]
497
results = await asyncio.gather(*tasks, return_exceptions=True)
498
return results
499
```
500
501
### Context Preservation
502
503
```python
504
import asyncio
505
from contextvars import ContextVar
506
507
# Context variable for request tracking
508
request_id: ContextVar[str] = ContextVar('request_id')
509
510
async def context_aware_processing():
511
request_id.set("req-12345")
512
513
async with AsyncPipelineClient("https://api.example.com") as client:
514
# Context is preserved across await boundaries
515
await process_step_1(client)
516
await process_step_2(client)
517
await process_step_3(client)
518
519
async def process_step_1(client):
520
# Context variable is available here
521
current_request_id = request_id.get()
522
print(f"Processing step 1 for request: {current_request_id}")
523
524
request = HttpRequest("GET", "/api/step1")
525
async with client.send_request(request) as response:
526
return response.json()
527
```
528
529
## Performance Optimization
530
531
### Connection Pooling
532
533
```python
534
import aiohttp
535
from azure.core.pipeline.transport import AioHttpTransport
536
537
async def optimized_client_setup():
538
# Configure connection pooling for better performance
539
connector = aiohttp.TCPConnector(
540
limit=100, # Total connection pool size
541
limit_per_host=30, # Max connections per host
542
ttl_dns_cache=300, # DNS cache TTL
543
use_dns_cache=True,
544
keepalive_timeout=30
545
)
546
547
timeout = aiohttp.ClientTimeout(
548
total=60, # Total timeout
549
connect=10, # Connection timeout
550
sock_read=30 # Socket read timeout
551
)
552
553
session = aiohttp.ClientSession(
554
connector=connector,
555
timeout=timeout
556
)
557
558
transport = AioHttpTransport(session=session, session_owner=False)
559
560
try:
561
async with AsyncPipelineClient(
562
"https://api.example.com",
563
transport=transport
564
) as client:
565
# Client now uses optimized connection pooling
566
return await perform_operations(client)
567
finally:
568
await session.close()
569
```
570
571
## Key Features
572
573
**Complete Async/Await Support**: Full feature parity with synchronous operations using async/await patterns.
574
575
**Dual Usage Pattern**: Unique response handling that supports both direct awaiting and context manager usage.
576
577
**Context Manager Integration**: Automatic resource management with async context managers throughout the stack.
578
579
**Concurrent Operations**: Built-in support for concurrent requests with proper resource management.
580
581
**Async Authentication**: Full async credential support with automatic token refresh and context management.
582
583
**Streaming Support**: Efficient async streaming for large responses with chunked processing.
584
585
**Error Recovery**: Robust error handling with async-compatible retry policies and exception management.
586
587
**Performance Optimized**: Connection pooling, keep-alive support, and efficient resource utilization.
588
589
The async programming patterns in Azure Core provide a complete, type-safe, and efficient foundation for building high-performance asynchronous Azure applications while maintaining consistency with synchronous operation patterns.