or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-programming-patterns.mdauthentication-and-credentials.mdconfiguration-and-settings.mddistributed-tracing-and-diagnostics.mderror-handling-and-exceptions.mdhttp-pipeline-and-policies.mdindex.mdpaging-and-result-iteration.mdpolling-and-long-running-operations.mdrest-api-abstraction.mdtransport-and-networking.mdutilities-and-helpers.md

transport-and-networking.mddocs/

0

# Transport and Networking

1

2

Azure Core provides HTTP transport abstractions that support multiple async frameworks and networking libraries. The transport layer handles the actual HTTP communication while providing a consistent interface for all Azure SDK clients.

3

4

## Base Transport Interfaces

5

6

Transport classes define the interface for sending HTTP requests and receiving responses.

7

8

```python { .api }

9

from azure.core.pipeline.transport import HttpTransport, AsyncHttpTransport, HttpRequest, HttpResponse, AsyncHttpResponse

10

from abc import ABC, abstractmethod

11

12

class HttpTransport(ABC):

13

"""Base synchronous HTTP transport interface."""

14

@abstractmethod

15

def send(self, request: HttpRequest, **kwargs) -> HttpResponse:

16

"""Send HTTP request and return response."""

17

...

18

19

@abstractmethod

20

def open(self) -> None:

21

"""Open transport connection."""

22

...

23

24

@abstractmethod

25

def close(self) -> None:

26

"""Close transport connection."""

27

...

28

29

def __enter__(self):

30

self.open()

31

return self

32

33

def __exit__(self, *args):

34

self.close()

35

36

class AsyncHttpTransport(ABC):

37

"""Base asynchronous HTTP transport interface."""

38

@abstractmethod

39

async def send(self, request: HttpRequest, **kwargs) -> AsyncHttpResponse:

40

"""Send HTTP request and return response."""

41

...

42

43

@abstractmethod

44

async def open(self) -> None:

45

"""Open transport connection."""

46

...

47

48

@abstractmethod

49

async def close(self) -> None:

50

"""Close transport connection."""

51

...

52

53

async def __aenter__(self):

54

await self.open()

55

return self

56

57

async def __aexit__(self, *args):

58

await self.close()

59

```

60

61

## HTTP Request and Response

62

63

Core classes for representing HTTP requests and responses.

64

65

```python { .api }

66

class HttpRequest:

67

"""HTTP request representation."""

68

def __init__(

69

self,

70

method: str,

71

url: str,

72

headers: Optional[Dict[str, str]] = None,

73

files: Optional[Dict] = None,

74

data: Optional[Union[bytes, str, Dict]] = None,

75

**kwargs

76

): ...

77

78

@property

79

def method(self) -> str: ...

80

81

@property

82

def url(self) -> str: ...

83

84

@property

85

def headers(self) -> Dict[str, str]: ...

86

87

@property

88

def body(self) -> Optional[bytes]: ...

89

90

def set_json_body(self, data: Any) -> None:

91

"""Set request body as JSON."""

92

...

93

94

def set_multipart_mixed(self, *requests: "HttpRequest") -> None:

95

"""Set multipart/mixed body."""

96

...

97

98

class HttpResponse:

99

"""HTTP response representation."""

100

def __init__(self, request: HttpRequest, internal_response, **kwargs): ...

101

102

@property

103

def request(self) -> HttpRequest: ...

104

105

@property

106

def status_code(self) -> int: ...

107

108

@property

109

def headers(self) -> Dict[str, str]: ...

110

111

@property

112

def reason(self) -> str: ...

113

114

@property

115

def content_type(self) -> Optional[str]: ...

116

117

@property

118

def text(self) -> str: ...

119

120

@property

121

def content(self) -> bytes: ...

122

123

def json(self) -> Any:

124

"""Parse response content as JSON."""

125

...

126

127

def iter_bytes(self, chunk_size: int = 1024) -> Iterator[bytes]:

128

"""Iterate response content in chunks."""

129

...

130

131

def iter_raw(self, chunk_size: int = 1024) -> Iterator[bytes]:

132

"""Iterate raw response content."""

133

...

134

135

class AsyncHttpResponse:

136

"""Asynchronous HTTP response representation."""

137

# Same properties as HttpResponse

138

139

async def json(self) -> Any:

140

"""Parse response content as JSON asynchronously."""

141

...

142

143

async def text(self) -> str:

144

"""Get response content as text asynchronously."""

145

...

146

147

async def read(self) -> bytes:

148

"""Read response content asynchronously."""

149

...

150

151

def iter_bytes(self, chunk_size: int = 1024) -> AsyncIterator[bytes]:

152

"""Async iterate response content in chunks."""

153

...

154

```

155

156

## Capabilities

157

158

### Requests Transport

159

160

HTTP transport implementation using the popular `requests` library.

161

162

```python { .api }

163

from azure.core.pipeline.transport import RequestsTransport, RequestsTransportResponse

164

165

class RequestsTransport(HttpTransport):

166

"""HTTP transport using the requests library."""

167

def __init__(

168

self,

169

session: Optional[requests.Session] = None,

170

session_owner: bool = True,

171

connection_timeout: float = 300,

172

read_timeout: float = 300,

173

**kwargs

174

): ...

175

176

def send(self, request: HttpRequest, **kwargs) -> RequestsTransportResponse: ...

177

178

class RequestsTransportResponse(HttpResponse):

179

"""Response implementation for requests transport."""

180

pass

181

```

182

183

### Asyncio Requests Transport

184

185

Async HTTP transport using asyncio with requests-like interface.

186

187

```python { .api }

188

from azure.core.pipeline.transport import AsyncioRequestsTransport, AsyncioRequestsTransportResponse

189

190

class AsyncioRequestsTransport(AsyncHttpTransport):

191

"""Async HTTP transport using asyncio."""

192

def __init__(

193

self,

194

session: Optional[aiohttp.ClientSession] = None,

195

session_owner: bool = True,

196

**kwargs

197

): ...

198

199

async def send(self, request: HttpRequest, **kwargs) -> AsyncioRequestsTransportResponse: ...

200

201

class AsyncioRequestsTransportResponse(AsyncHttpResponse):

202

"""Async response implementation for asyncio transport."""

203

pass

204

```

205

206

### AioHTTP Transport

207

208

HTTP transport implementation using the aiohttp library for async operations.

209

210

```python { .api }

211

from azure.core.pipeline.transport import AioHttpTransport, AioHttpTransportResponse

212

213

class AioHttpTransport(AsyncHttpTransport):

214

"""HTTP transport using aiohttp library."""

215

def __init__(

216

self,

217

session: Optional[aiohttp.ClientSession] = None,

218

session_owner: bool = True,

219

**kwargs

220

): ...

221

222

async def send(self, request: HttpRequest, **kwargs) -> AioHttpTransportResponse: ...

223

224

class AioHttpTransportResponse(AsyncHttpResponse):

225

"""Response implementation for aiohttp transport."""

226

pass

227

```

228

229

### Trio Transport

230

231

HTTP transport for the Trio async framework.

232

233

```python { .api }

234

from azure.core.pipeline.transport import TrioRequestsTransport, TrioRequestsTransportResponse

235

236

class TrioRequestsTransport(AsyncHttpTransport):

237

"""HTTP transport for Trio async framework."""

238

def __init__(self, **kwargs): ...

239

240

async def send(self, request: HttpRequest, **kwargs) -> TrioRequestsTransportResponse: ...

241

242

class TrioRequestsTransportResponse(AsyncHttpResponse):

243

"""Response implementation for Trio transport."""

244

pass

245

```

246

247

## Usage Examples

248

249

### Basic Transport Usage

250

251

```python

252

from azure.core.pipeline.transport import RequestsTransport, HttpRequest

253

254

# Create transport

255

transport = RequestsTransport(

256

connection_timeout=30,

257

read_timeout=60

258

)

259

260

# Create request

261

request = HttpRequest("GET", "https://api.example.com/data")

262

request.headers["Accept"] = "application/json"

263

264

# Send request

265

with transport:

266

response = transport.send(request)

267

print(f"Status: {response.status_code}")

268

print(f"Content: {response.text}")

269

```

270

271

### Async Transport Usage

272

273

```python

274

import asyncio

275

from azure.core.pipeline.transport import AioHttpTransport, HttpRequest

276

277

async def async_transport_example():

278

# Create async transport

279

transport = AioHttpTransport()

280

281

# Create request

282

request = HttpRequest("GET", "https://api.example.com/data")

283

284

# Send request

285

async with transport:

286

response = await transport.send(request)

287

content = await response.text()

288

print(f"Status: {response.status_code}")

289

print(f"Content: {content}")

290

291

# asyncio.run(async_transport_example())

292

```

293

294

### Custom Transport Configuration

295

296

```python

297

import requests

298

from azure.core.pipeline.transport import RequestsTransport

299

300

# Create custom session with specific configuration

301

session = requests.Session()

302

session.verify = "/path/to/ca-bundle.pem" # Custom CA bundle

303

session.cert = ("/path/to/client.cert", "/path/to/client.key") # Client cert

304

session.proxies = {

305

"http": "http://proxy.example.com:8080",

306

"https": "https://proxy.example.com:8080"

307

}

308

309

# Create transport with custom session

310

transport = RequestsTransport(

311

session=session,

312

session_owner=False, # Don't close session automatically

313

connection_timeout=60,

314

read_timeout=120

315

)

316

```

317

318

### Stream Response Handling

319

320

```python

321

from azure.core.pipeline.transport import HttpRequest

322

323

def download_large_file(transport, url, filename):

324

request = HttpRequest("GET", url)

325

326

with transport:

327

response = transport.send(request, stream=True)

328

329

if response.status_code == 200:

330

with open(filename, "wb") as f:

331

for chunk in response.iter_bytes(chunk_size=8192):

332

f.write(chunk)

333

else:

334

print(f"Download failed: {response.status_code}")

335

```

336

337

### JSON Request and Response

338

339

```python

340

import json

341

from azure.core.pipeline.transport import HttpRequest

342

343

def send_json_request(transport, url, data):

344

request = HttpRequest("POST", url)

345

request.headers["Content-Type"] = "application/json"

346

request.set_json_body(data)

347

348

with transport:

349

response = transport.send(request)

350

351

if response.status_code == 200:

352

return response.json()

353

else:

354

raise Exception(f"Request failed: {response.status_code}")

355

356

# Example usage

357

data = {"name": "example", "value": 42}

358

result = send_json_request(transport, "https://api.example.com/items", data)

359

```

360

361

### Multipart Request

362

363

```python

364

from azure.core.pipeline.transport import HttpRequest

365

366

def upload_file_multipart(transport, url, file_path):

367

with open(file_path, "rb") as f:

368

files = {"file": f}

369

370

request = HttpRequest("POST", url, files=files)

371

372

with transport:

373

response = transport.send(request)

374

return response.status_code == 200

375

```

376

377

### Async Stream Processing

378

379

```python

380

import asyncio

381

from azure.core.pipeline.transport import AioHttpTransport, HttpRequest

382

383

async def async_stream_download(url, filename):

384

transport = AioHttpTransport()

385

request = HttpRequest("GET", url)

386

387

async with transport:

388

response = await transport.send(request)

389

390

if response.status_code == 200:

391

with open(filename, "wb") as f:

392

async for chunk in response.iter_bytes(chunk_size=8192):

393

f.write(chunk)

394

else:

395

print(f"Download failed: {response.status_code}")

396

397

# asyncio.run(async_stream_download("https://example.com/file.zip", "file.zip"))

398

```

399

400

## Transport Selection and Configuration

401

402

### Automatic Transport Selection

403

404

Azure Core automatically selects appropriate transport based on available libraries:

405

406

```python

407

from azure.core.pipeline.transport import RequestsTransport

408

from azure.core import PipelineClient

409

410

# Azure Core will automatically choose the best available transport

411

client = PipelineClient(base_url="https://api.example.com")

412

413

# Or explicitly specify transport

414

transport = RequestsTransport()

415

client = PipelineClient(base_url="https://api.example.com", transport=transport)

416

```

417

418

### Connection Pool Configuration

419

420

```python

421

import requests

422

from azure.core.pipeline.transport import RequestsTransport

423

424

# Configure connection pooling

425

session = requests.Session()

426

adapter = requests.adapters.HTTPAdapter(

427

pool_connections=20, # Number of connection pools

428

pool_maxsize=20, # Maximum connections per pool

429

max_retries=3

430

)

431

session.mount("http://", adapter)

432

session.mount("https://", adapter)

433

434

transport = RequestsTransport(session=session, session_owner=False)

435

```

436

437

### SSL and Certificate Configuration

438

439

```python

440

import ssl

441

import aiohttp

442

from azure.core.pipeline.transport import AioHttpTransport

443

444

# Create SSL context

445

ssl_context = ssl.create_default_context()

446

ssl_context.check_hostname = False

447

ssl_context.verify_mode = ssl.CERT_NONE

448

449

# Configure aiohttp with custom SSL

450

connector = aiohttp.TCPConnector(ssl=ssl_context)

451

session = aiohttp.ClientSession(connector=connector)

452

453

transport = AioHttpTransport(session=session, session_owner=False)

454

```

455

456

### Advanced Connection Management

457

458

Configuration classes and methods for managing connection lifecycle, pooling, and advanced settings.

459

460

```python { .api }

461

from azure.core.configuration import ConnectionConfiguration

462

from azure.core.pipeline.transport._bigger_block_size_http_adapters import BiggerBlockSizeHTTPAdapter

463

from typing import Union, Optional, Any

464

465

class ConnectionConfiguration:

466

"""HTTP transport connection configuration settings."""

467

def __init__(

468

self,

469

*,

470

connection_timeout: float = 300,

471

read_timeout: float = 300,

472

connection_verify: Union[bool, str] = True,

473

connection_cert: Optional[str] = None,

474

connection_data_block_size: int = 4096,

475

**kwargs: Any

476

) -> None: ...

477

478

class BiggerBlockSizeHTTPAdapter:

479

"""Custom HTTP adapter that optimizes block size for better performance."""

480

def get_connection(self, url, proxies=None): ...

481

```

482

483

#### Usage Examples

484

485

```python

486

import requests

487

from requests.adapters import HTTPAdapter

488

from azure.core.pipeline.transport import RequestsTransport, AioHttpTransport

489

import aiohttp

490

import ssl

491

492

# Advanced connection pooling configuration

493

session = requests.Session()

494

adapter = HTTPAdapter(

495

pool_connections=20, # Number of connection pools

496

pool_maxsize=20, # Max connections per pool

497

max_retries=3

498

)

499

session.mount("http://", adapter)

500

session.mount("https://", adapter)

501

502

transport = RequestsTransport(session=session, session_owner=False)

503

504

# Advanced SSL configuration for aiohttp

505

ssl_context = ssl.create_default_context()

506

ssl_context.load_verify_locations("/path/to/ca-bundle.pem")

507

ssl_context.load_cert_chain("/path/to/client.pem", "/path/to/client.key")

508

509

connector = aiohttp.TCPConnector(

510

ssl=ssl_context,

511

limit=100, # Total connection pool size

512

limit_per_host=20, # Per-host connection limit

513

keepalive_timeout=30 # Keep-alive timeout

514

)

515

session = aiohttp.ClientSession(connector=connector)

516

transport = AioHttpTransport(session=session, session_owner=False)

517

518

# Session ownership patterns

519

# Transport owns session (default) - will close session automatically

520

transport_owned = RequestsTransport(session_owner=True)

521

522

# External session management - must close session manually

523

external_session = requests.Session()

524

transport_external = RequestsTransport(session=external_session, session_owner=False)

525

```

526

527

### Advanced Timeout Configuration

528

529

Extended timeout handling with per-request overrides.

530

531

```python

532

from azure.core.pipeline.transport import RequestsTransport, HttpRequest

533

534

transport = RequestsTransport()

535

request = HttpRequest("GET", "https://api.example.com/data")

536

537

# Per-request timeout override

538

response = transport.send(

539

request,

540

connection_timeout=30.0, # Connection establishment timeout

541

read_timeout=60.0 # Read timeout

542

)

543

544

# Environment settings control

545

transport_with_env = RequestsTransport(use_env_settings=True) # Default

546

transport_no_env = RequestsTransport(use_env_settings=False) # Disable env proxy

547

```

548

549

### Transport-Specific Response Methods

550

551

Advanced response handling methods for different transport implementations.

552

553

```python { .api }

554

from azure.core.pipeline.transport import AsyncHttpResponse, HttpResponse

555

from typing import Iterator, AsyncIterator

556

557

class AsyncHttpResponse:

558

async def load_body(self) -> None:

559

"""Load response body into memory for sync access."""

560

561

def parts(self) -> AsyncIterator["AsyncHttpResponse"]:

562

"""Parse multipart/mixed responses asynchronously."""

563

564

class HttpResponse:

565

def stream_download(self, pipeline, **kwargs) -> Iterator[bytes]:

566

"""Stream download with error handling and decompression control."""

567

568

def __getstate__(self):

569

"""Pickle support - loads body and handles unpicklable objects."""

570

```

571

572

#### Usage Examples

573

574

```python

575

from azure.core.pipeline.transport import AioHttpTransport, HttpRequest

576

577

async def handle_multipart_response():

578

transport = AioHttpTransport()

579

request = HttpRequest("GET", "https://api.example.com/multipart")

580

581

async with transport:

582

response = await transport.send(request)

583

584

# Handle multipart response

585

async for part in response.parts():

586

content = await part.read()

587

print(f"Part content: {len(content)} bytes")

588

589

# Stream download with progress tracking

590

def download_with_progress(transport, url, file_path):

591

request = HttpRequest("GET", url)

592

593

with transport:

594

response = transport.send(request)

595

596

with open(file_path, "wb") as f:

597

total_bytes = 0

598

for chunk in response.stream_download(pipeline=None):

599

f.write(chunk)

600

total_bytes += len(chunk)

601

print(f"Downloaded: {total_bytes} bytes")

602

```

603

604

## Best Practices

605

606

### Transport Management

607

608

- Use context managers (`with` statements) for proper resource cleanup

609

- Share transport instances across multiple requests for connection reuse

610

- Configure appropriate timeouts based on your service requirements

611

- Use async transports with async code for optimal performance

612

613

### Connection Configuration

614

615

- Set reasonable connection and read timeouts

616

- Configure connection pooling for high-throughput scenarios

617

- Use appropriate SSL/TLS settings for security requirements

618

- Configure proxy settings when running behind corporate firewalls

619

620

### Error Handling

621

622

- Handle transport-specific exceptions appropriately

623

- Implement retry logic for transient network errors

624

- Monitor connection pool exhaustion and adjust sizing accordingly

625

- Log transport errors with sufficient detail for debugging

626

627

### Performance Optimization

628

629

- Use streaming for large payloads to avoid memory issues

630

- Implement connection pooling and keep-alive for multiple requests

631

- Choose async transports for I/O bound operations

632

- Monitor and tune timeout settings based on service characteristics