or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-programming-patterns.mdauthentication-and-credentials.mdconfiguration-and-settings.mddistributed-tracing-and-diagnostics.mderror-handling-and-exceptions.mdhttp-pipeline-and-policies.mdindex.mdpaging-and-result-iteration.mdpolling-and-long-running-operations.mdrest-api-abstraction.mdtransport-and-networking.mdutilities-and-helpers.md

async-programming-patterns.mddocs/

0

# Async Programming Patterns

1

2

Azure Core provides comprehensive async/await support with full feature parity to synchronous operations. The async infrastructure includes async versions of all core classes, proper resource management, context manager patterns, and performance optimizations for asynchronous operations.

3

4

## Core Async Components

5

6

### AsyncPipelineClient

7

8

Main async client for Azure services with context manager support and flexible response handling.

9

10

```python { .api }

11

from azure.core import AsyncPipelineClient

12

from azure.core.credentials import AsyncTokenCredential

13

from azure.core.rest import HttpRequest

14

from typing import AsyncContextManager, Optional, Any

15

16

class AsyncPipelineClient(AsyncContextManager["AsyncPipelineClient"]):

17

def __init__(

18

self,

19

base_url: str,

20

*,

21

pipeline: Optional[AsyncPipeline] = None,

22

config: Optional[Configuration] = None,

23

**kwargs: Any,

24

): ...

25

26

def send_request(

27

self,

28

request: HttpRequest,

29

*,

30

stream: bool = False,

31

**kwargs: Any

32

) -> Awaitable[AsyncHttpResponse]: ...

33

34

async def __aenter__(self) -> "AsyncPipelineClient": ...

35

async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...

36

async def close(self) -> None: ...

37

```

38

39

### AsyncPipeline

40

41

Asynchronous HTTP pipeline with context manager support and async policy execution.

42

43

```python { .api }

44

from azure.core.pipeline import AsyncPipeline, AsyncHTTPPolicy

45

from azure.core.pipeline.transport import AsyncHttpTransport

46

from typing import AsyncContextManager, Iterable, Union, Optional

47

48

class AsyncPipeline(AsyncContextManager["AsyncPipeline"]):

49

def __init__(

50

self,

51

transport: AsyncHttpTransport,

52

policies: Optional[Iterable[Union[AsyncHTTPPolicy, SansIOHTTPPolicy]]] = None,

53

): ...

54

55

async def run(self, request: HttpRequest, **kwargs: Any) -> PipelineResponse: ...

56

57

async def __aenter__(self) -> "AsyncPipeline": ...

58

async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...

59

```

60

61

## Basic Async Usage

62

63

### Client Creation and Usage

64

65

```python

66

from azure.core import AsyncPipelineClient

67

from azure.core.credentials import AsyncTokenCredential

68

from azure.core.rest import HttpRequest

69

import asyncio

70

71

async def basic_async_client():

72

# Create async client with context manager (recommended)

73

async with AsyncPipelineClient("https://api.example.com") as client:

74

request = HttpRequest("GET", "/api/data")

75

response = await client.send_request(request)

76

77

# Handle response

78

response.raise_for_status()

79

data = response.json()

80

return data

81

82

# Run async function

83

result = asyncio.run(basic_async_client())

84

```

85

86

### Manual Resource Management

87

88

```python

89

async def manual_resource_management():

90

client = AsyncPipelineClient("https://api.example.com")

91

92

try:

93

# Manual opening not required, but available

94

await client.__aenter__()

95

96

request = HttpRequest("GET", "/api/data")

97

response = await client.send_request(request)

98

99

return response.json()

100

finally:

101

# Always close resources

102

await client.close()

103

```

104

105

### Dual Usage Response Pattern

106

107

Azure Core's unique dual-usage pattern allows responses to be used both as awaitable and context manager:

108

109

```python

110

async def dual_usage_patterns():

111

async with AsyncPipelineClient("https://api.example.com") as client:

112

request = HttpRequest("GET", "/api/data")

113

114

# Pattern 1: Direct await

115

response = await client.send_request(request)

116

data = response.json()

117

response.close() # Manual cleanup

118

119

# Pattern 2: Context manager (automatic cleanup)

120

async with client.send_request(request) as response:

121

response.raise_for_status()

122

data = response.json()

123

# Response automatically closed when exiting context

124

125

return data

126

```

127

128

## Async Authentication

129

130

### AsyncTokenCredential

131

132

Protocol for async token-based authentication with context manager support.

133

134

```python { .api }

135

from azure.core.credentials import AsyncTokenCredential, AccessToken

136

from typing import AsyncContextManager, Optional

137

138

class AsyncTokenCredential(AsyncContextManager["AsyncTokenCredential"]):

139

async def get_token(

140

self,

141

*scopes: str,

142

claims: Optional[str] = None,

143

tenant_id: Optional[str] = None,

144

enable_cae: bool = False,

145

**kwargs: Any,

146

) -> AccessToken: ...

147

148

async def close(self) -> None: ...

149

async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...

150

```

151

152

### Usage with Authentication

153

154

```python

155

from azure.identity.aio import DefaultAzureCredential

156

from azure.core import AsyncPipelineClient

157

158

async def authenticated_client():

159

# Create async credential

160

credential = DefaultAzureCredential()

161

162

try:

163

# Create client with authentication

164

async with AsyncPipelineClient(

165

"https://api.example.com",

166

credential=credential

167

) as client:

168

request = HttpRequest("GET", "/api/protected-resource")

169

170

async with client.send_request(request) as response:

171

response.raise_for_status()

172

return response.json()

173

finally:

174

# Close credential resources

175

await credential.close()

176

```

177

178

## Async Pipeline Policies

179

180

### AsyncBearerTokenCredentialPolicy

181

182

Async authentication policy with automatic token refresh and challenge handling.

183

184

```python { .api }

185

from azure.core.pipeline.policies import AsyncBearerTokenCredentialPolicy

186

from azure.core.credentials import AsyncTokenCredential

187

188

class AsyncBearerTokenCredentialPolicy(AsyncHTTPPolicy):

189

def __init__(

190

self,

191

credential: AsyncTokenCredential,

192

*scopes: str,

193

**kwargs: Any

194

): ...

195

196

async def on_request(self, request: PipelineRequest) -> None: ...

197

async def on_challenge(self, request: PipelineRequest, response: PipelineResponse) -> bool: ...

198

async def send(self, request: PipelineRequest) -> PipelineResponse: ...

199

```

200

201

### AsyncRetryPolicy

202

203

Async retry policy with configurable backoff and sleep patterns.

204

205

```python { .api }

206

from azure.core.pipeline.policies import AsyncRetryPolicy

207

208

class AsyncRetryPolicy(AsyncHTTPPolicy):

209

def __init__(

210

self,

211

*,

212

retry_total: int = 10,

213

retry_connect: int = 3,

214

retry_read: int = 3,

215

retry_status: int = 3,

216

retry_backoff_factor: float = 0.8,

217

retry_backoff_max: int = 120,

218

**kwargs: Any

219

): ...

220

221

async def sleep(self, settings: Dict[str, Any], transport: AsyncHttpTransport) -> None: ...

222

async def send(self, request: PipelineRequest) -> PipelineResponse: ...

223

```

224

225

## Async Transport

226

227

### AioHttpTransport

228

229

HTTP transport implementation using aiohttp with full async support.

230

231

```python

232

from azure.core.pipeline.transport import AioHttpTransport

233

import aiohttp

234

235

async def custom_transport_example():

236

# Custom aiohttp session configuration

237

timeout = aiohttp.ClientTimeout(total=30, connect=10)

238

connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)

239

240

session = aiohttp.ClientSession(

241

timeout=timeout,

242

connector=connector

243

)

244

245

# Create transport with custom session

246

transport = AioHttpTransport(session=session, session_owner=False)

247

248

try:

249

async with AsyncPipelineClient(

250

"https://api.example.com",

251

transport=transport

252

) as client:

253

request = HttpRequest("GET", "/api/data")

254

response = await client.send_request(request)

255

return response.json()

256

finally:

257

await session.close()

258

```

259

260

## Async Streaming

261

262

### Streaming Responses

263

264

```python

265

async def streaming_download():

266

async with AsyncPipelineClient("https://api.example.com") as client:

267

request = HttpRequest("GET", "/api/large-file")

268

269

# Enable streaming

270

async with client.send_request(request, stream=True) as response:

271

response.raise_for_status()

272

273

# Stream download with chunks

274

total_size = 0

275

async for chunk in response.iter_bytes(chunk_size=8192):

276

total_size += len(chunk)

277

process_chunk(chunk)

278

279

# Progress reporting

280

if total_size % (1024 * 1024) == 0: # Every MB

281

print(f"Downloaded {total_size // (1024 * 1024)} MB")

282

283

print(f"Download complete: {total_size} bytes")

284

```

285

286

### Multipart Responses

287

288

```python

289

async def handle_multipart_response():

290

async with AsyncPipelineClient("https://api.example.com") as client:

291

request = HttpRequest("GET", "/api/multipart-data")

292

293

async with client.send_request(request) as response:

294

# Iterate over multipart sections

295

async for part in response.parts():

296

content_type = part.headers.get("content-type")

297

content = await part.read()

298

299

print(f"Part: {content_type}, Size: {len(content)}")

300

process_part(content_type, content)

301

```

302

303

## Async Pagination

304

305

### ItemPaged Async Usage

306

307

```python

308

from azure.core.async_paging import AsyncItemPaged

309

310

async def paginated_data_processing():

311

# Assume client.list_items() returns AsyncItemPaged

312

async_pager = client.list_items()

313

314

# Item-by-item iteration (recommended for most cases)

315

processed_count = 0

316

async for item in async_pager:

317

await process_item(item)

318

processed_count += 1

319

320

# Progress reporting

321

if processed_count % 100 == 0:

322

print(f"Processed {processed_count} items")

323

324

return processed_count

325

326

async def paginated_batch_processing():

327

async_pager = client.list_items()

328

329

# Page-by-page iteration for batch processing

330

page_count = 0

331

async for page in async_pager.by_page():

332

page_items = []

333

async for item in page:

334

page_items.append(item)

335

336

# Process entire page as batch

337

await process_batch(page_items)

338

page_count += 1

339

print(f"Processed page {page_count} with {len(page_items)} items")

340

```

341

342

## Async Long-Running Operations

343

344

### AsyncLROPoller

345

346

```python

347

from azure.core.polling import AsyncLROPoller

348

349

async def long_running_operation():

350

async with AsyncPipelineClient("https://api.example.com") as client:

351

# Start long-running operation

352

request = HttpRequest("POST", "/api/start-operation", json={"data": "value"})

353

354

# Begin polling operation

355

poller: AsyncLROPoller = await client.begin_long_running_operation(request)

356

357

# Wait for completion with custom timeout

358

try:

359

result = await poller.result(timeout=300) # 5 minutes

360

print(f"Operation completed: {result}")

361

return result

362

except Exception as e:

363

print(f"Operation failed or timed out: {e}")

364

raise

365

366

async def polling_with_progress():

367

poller = await client.begin_operation(request)

368

369

# Manual polling with progress updates

370

while not poller.done():

371

print("Operation in progress...")

372

await asyncio.sleep(5) # Check every 5 seconds

373

374

result = await poller.result()

375

return result

376

```

377

378

## Async Error Handling

379

380

### Exception Handling Patterns

381

382

```python

383

from azure.core.exceptions import HttpResponseError, ServiceRequestError

384

import asyncio

385

386

async def robust_error_handling():

387

async with AsyncPipelineClient("https://api.example.com") as client:

388

try:

389

request = HttpRequest("GET", "/api/data")

390

391

async with client.send_request(request) as response:

392

response.raise_for_status()

393

return response.json()

394

395

except HttpResponseError as e:

396

if e.status_code == 429: # Rate limited

397

retry_after = int(e.response.headers.get("Retry-After", "60"))

398

print(f"Rate limited, waiting {retry_after} seconds")

399

await asyncio.sleep(retry_after)

400

# Implement retry logic

401

return await retry_request(client, request)

402

else:

403

print(f"HTTP error {e.status_code}: {e.message}")

404

raise

405

except ServiceRequestError as e:

406

print(f"Service request error: {e}")

407

raise

408

except asyncio.TimeoutError:

409

print("Request timed out")

410

raise

411

```

412

413

## Async Tracing

414

415

### Distributed Tracing with Async

416

417

```python

418

from azure.core.tracing.decorator_async import distributed_trace_async

419

from azure.core.tracing import SpanKind

420

421

@distributed_trace_async(name_of_span="async_data_fetch", kind=SpanKind.CLIENT)

422

async def fetch_data_with_tracing(client: AsyncPipelineClient, resource_id: str):

423

"""Automatically traced async function"""

424

request = HttpRequest("GET", f"/api/resources/{resource_id}")

425

426

async with client.send_request(request) as response:

427

response.raise_for_status()

428

return response.json()

429

430

async def manual_async_tracing():

431

from azure.core.tracing import AbstractSpan, SpanKind

432

433

async with AbstractSpan(name="async_operation", kind=SpanKind.CLIENT) as span:

434

span.add_attribute("operation.type", "data_processing")

435

436

try:

437

result = await process_async_data()

438

span.add_attribute("operation.success", True)

439

span.add_attribute("result.count", len(result))

440

return result

441

except Exception as e:

442

span.add_attribute("error.type", type(e).__name__)

443

span.add_attribute("error.message", str(e))

444

raise

445

```

446

447

## Advanced Async Patterns

448

449

### Concurrent Operations

450

451

```python

452

import asyncio

453

from typing import List

454

455

async def concurrent_requests():

456

async with AsyncPipelineClient("https://api.example.com") as client:

457

# Create multiple requests

458

requests = [

459

HttpRequest("GET", f"/api/resource/{i}")

460

for i in range(1, 11)

461

]

462

463

# Execute requests concurrently

464

async def fetch_single(request):

465

async with client.send_request(request) as response:

466

response.raise_for_status()

467

return response.json()

468

469

# Use asyncio.gather for concurrent execution

470

results = await asyncio.gather(

471

*[fetch_single(req) for req in requests],

472

return_exceptions=True

473

)

474

475

# Process results

476

successful_results = []

477

for i, result in enumerate(results):

478

if isinstance(result, Exception):

479

print(f"Request {i+1} failed: {result}")

480

else:

481

successful_results.append(result)

482

483

return successful_results

484

485

async def semaphore_controlled_requests():

486

# Limit concurrent requests to prevent overwhelming the service

487

semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests

488

489

async def controlled_fetch(client, request):

490

async with semaphore:

491

async with client.send_request(request) as response:

492

response.raise_for_status()

493

return response.json()

494

495

async with AsyncPipelineClient("https://api.example.com") as client:

496

tasks = [controlled_fetch(client, req) for req in requests]

497

results = await asyncio.gather(*tasks, return_exceptions=True)

498

return results

499

```

500

501

### Context Preservation

502

503

```python

504

import asyncio

505

from contextvars import ContextVar

506

507

# Context variable for request tracking

508

request_id: ContextVar[str] = ContextVar('request_id')

509

510

async def context_aware_processing():

511

request_id.set("req-12345")

512

513

async with AsyncPipelineClient("https://api.example.com") as client:

514

# Context is preserved across await boundaries

515

await process_step_1(client)

516

await process_step_2(client)

517

await process_step_3(client)

518

519

async def process_step_1(client):

520

# Context variable is available here

521

current_request_id = request_id.get()

522

print(f"Processing step 1 for request: {current_request_id}")

523

524

request = HttpRequest("GET", "/api/step1")

525

async with client.send_request(request) as response:

526

return response.json()

527

```

528

529

## Performance Optimization

530

531

### Connection Pooling

532

533

```python

534

import aiohttp

535

from azure.core.pipeline.transport import AioHttpTransport

536

537

async def optimized_client_setup():

538

# Configure connection pooling for better performance

539

connector = aiohttp.TCPConnector(

540

limit=100, # Total connection pool size

541

limit_per_host=30, # Max connections per host

542

ttl_dns_cache=300, # DNS cache TTL

543

use_dns_cache=True,

544

keepalive_timeout=30

545

)

546

547

timeout = aiohttp.ClientTimeout(

548

total=60, # Total timeout

549

connect=10, # Connection timeout

550

sock_read=30 # Socket read timeout

551

)

552

553

session = aiohttp.ClientSession(

554

connector=connector,

555

timeout=timeout

556

)

557

558

transport = AioHttpTransport(session=session, session_owner=False)

559

560

try:

561

async with AsyncPipelineClient(

562

"https://api.example.com",

563

transport=transport

564

) as client:

565

# Client now uses optimized connection pooling

566

return await perform_operations(client)

567

finally:

568

await session.close()

569

```

570

571

## Key Features

572

573

**Complete Async/Await Support**: Full feature parity with synchronous operations using async/await patterns.

574

575

**Dual Usage Pattern**: Unique response handling that supports both direct awaiting and context manager usage.

576

577

**Context Manager Integration**: Automatic resource management with async context managers throughout the stack.

578

579

**Concurrent Operations**: Built-in support for concurrent requests with proper resource management.

580

581

**Async Authentication**: Full async credential support with automatic token refresh and context management.

582

583

**Streaming Support**: Efficient async streaming for large responses with chunked processing.

584

585

**Error Recovery**: Robust error handling with async-compatible retry policies and exception management.

586

587

**Performance Optimized**: Connection pooling, keep-alive support, and efficient resource utilization.

588

589

The async programming patterns in Azure Core provide a complete, type-safe, and efficient foundation for building high-performance asynchronous Azure applications while maintaining consistency with synchronous operation patterns.