docs
0
# Transport and Networking
1
2
Azure Core provides HTTP transport abstractions that support multiple async frameworks and networking libraries. The transport layer handles the actual HTTP communication while providing a consistent interface for all Azure SDK clients.
3
4
## Base Transport Interfaces
5
6
Transport classes define the interface for sending HTTP requests and receiving responses.
7
8
```python { .api }
9
from azure.core.pipeline.transport import HttpTransport, AsyncHttpTransport, HttpRequest, HttpResponse, AsyncHttpResponse
10
from abc import ABC, abstractmethod
11
12
class HttpTransport(ABC):
13
"""Base synchronous HTTP transport interface."""
14
@abstractmethod
15
def send(self, request: HttpRequest, **kwargs) -> HttpResponse:
16
"""Send HTTP request and return response."""
17
...
18
19
@abstractmethod
20
def open(self) -> None:
21
"""Open transport connection."""
22
...
23
24
@abstractmethod
25
def close(self) -> None:
26
"""Close transport connection."""
27
...
28
29
def __enter__(self):
30
self.open()
31
return self
32
33
def __exit__(self, *args):
34
self.close()
35
36
class AsyncHttpTransport(ABC):
37
"""Base asynchronous HTTP transport interface."""
38
@abstractmethod
39
async def send(self, request: HttpRequest, **kwargs) -> AsyncHttpResponse:
40
"""Send HTTP request and return response."""
41
...
42
43
@abstractmethod
44
async def open(self) -> None:
45
"""Open transport connection."""
46
...
47
48
@abstractmethod
49
async def close(self) -> None:
50
"""Close transport connection."""
51
...
52
53
async def __aenter__(self):
54
await self.open()
55
return self
56
57
async def __aexit__(self, *args):
58
await self.close()
59
```
60
61
## HTTP Request and Response
62
63
Core classes for representing HTTP requests and responses.
64
65
```python { .api }
66
class HttpRequest:
67
"""HTTP request representation."""
68
def __init__(
69
self,
70
method: str,
71
url: str,
72
headers: Optional[Dict[str, str]] = None,
73
files: Optional[Dict] = None,
74
data: Optional[Union[bytes, str, Dict]] = None,
75
**kwargs
76
): ...
77
78
@property
79
def method(self) -> str: ...
80
81
@property
82
def url(self) -> str: ...
83
84
@property
85
def headers(self) -> Dict[str, str]: ...
86
87
@property
88
def body(self) -> Optional[bytes]: ...
89
90
def set_json_body(self, data: Any) -> None:
91
"""Set request body as JSON."""
92
...
93
94
def set_multipart_mixed(self, *requests: "HttpRequest") -> None:
95
"""Set multipart/mixed body."""
96
...
97
98
class HttpResponse:
99
"""HTTP response representation."""
100
def __init__(self, request: HttpRequest, internal_response, **kwargs): ...
101
102
@property
103
def request(self) -> HttpRequest: ...
104
105
@property
106
def status_code(self) -> int: ...
107
108
@property
109
def headers(self) -> Dict[str, str]: ...
110
111
@property
112
def reason(self) -> str: ...
113
114
@property
115
def content_type(self) -> Optional[str]: ...
116
117
@property
118
def text(self) -> str: ...
119
120
@property
121
def content(self) -> bytes: ...
122
123
def json(self) -> Any:
124
"""Parse response content as JSON."""
125
...
126
127
def iter_bytes(self, chunk_size: int = 1024) -> Iterator[bytes]:
128
"""Iterate response content in chunks."""
129
...
130
131
def iter_raw(self, chunk_size: int = 1024) -> Iterator[bytes]:
132
"""Iterate raw response content."""
133
...
134
135
class AsyncHttpResponse:
136
"""Asynchronous HTTP response representation."""
137
# Same properties as HttpResponse
138
139
async def json(self) -> Any:
140
"""Parse response content as JSON asynchronously."""
141
...
142
143
async def text(self) -> str:
144
"""Get response content as text asynchronously."""
145
...
146
147
async def read(self) -> bytes:
148
"""Read response content asynchronously."""
149
...
150
151
def iter_bytes(self, chunk_size: int = 1024) -> AsyncIterator[bytes]:
152
"""Async iterate response content in chunks."""
153
...
154
```
155
156
## Capabilities
157
158
### Requests Transport
159
160
HTTP transport implementation using the popular `requests` library.
161
162
```python { .api }
163
from azure.core.pipeline.transport import RequestsTransport, RequestsTransportResponse
164
165
class RequestsTransport(HttpTransport):
166
"""HTTP transport using the requests library."""
167
def __init__(
168
self,
169
session: Optional[requests.Session] = None,
170
session_owner: bool = True,
171
connection_timeout: float = 300,
172
read_timeout: float = 300,
173
**kwargs
174
): ...
175
176
def send(self, request: HttpRequest, **kwargs) -> RequestsTransportResponse: ...
177
178
class RequestsTransportResponse(HttpResponse):
179
"""Response implementation for requests transport."""
180
pass
181
```
182
183
### Asyncio Requests Transport
184
185
Async HTTP transport using asyncio with requests-like interface.
186
187
```python { .api }
188
from azure.core.pipeline.transport import AsyncioRequestsTransport, AsyncioRequestsTransportResponse
189
190
class AsyncioRequestsTransport(AsyncHttpTransport):
191
"""Async HTTP transport using asyncio."""
192
def __init__(
193
self,
194
session: Optional[aiohttp.ClientSession] = None,
195
session_owner: bool = True,
196
**kwargs
197
): ...
198
199
async def send(self, request: HttpRequest, **kwargs) -> AsyncioRequestsTransportResponse: ...
200
201
class AsyncioRequestsTransportResponse(AsyncHttpResponse):
202
"""Async response implementation for asyncio transport."""
203
pass
204
```
205
206
### AioHTTP Transport
207
208
HTTP transport implementation using the aiohttp library for async operations.
209
210
```python { .api }
211
from azure.core.pipeline.transport import AioHttpTransport, AioHttpTransportResponse
212
213
class AioHttpTransport(AsyncHttpTransport):
214
"""HTTP transport using aiohttp library."""
215
def __init__(
216
self,
217
session: Optional[aiohttp.ClientSession] = None,
218
session_owner: bool = True,
219
**kwargs
220
): ...
221
222
async def send(self, request: HttpRequest, **kwargs) -> AioHttpTransportResponse: ...
223
224
class AioHttpTransportResponse(AsyncHttpResponse):
225
"""Response implementation for aiohttp transport."""
226
pass
227
```
228
229
### Trio Transport
230
231
HTTP transport for the Trio async framework.
232
233
```python { .api }
234
from azure.core.pipeline.transport import TrioRequestsTransport, TrioRequestsTransportResponse
235
236
class TrioRequestsTransport(AsyncHttpTransport):
237
"""HTTP transport for Trio async framework."""
238
def __init__(self, **kwargs): ...
239
240
async def send(self, request: HttpRequest, **kwargs) -> TrioRequestsTransportResponse: ...
241
242
class TrioRequestsTransportResponse(AsyncHttpResponse):
243
"""Response implementation for Trio transport."""
244
pass
245
```
246
247
## Usage Examples
248
249
### Basic Transport Usage
250
251
```python
252
from azure.core.pipeline.transport import RequestsTransport, HttpRequest
253
254
# Create transport
255
transport = RequestsTransport(
256
connection_timeout=30,
257
read_timeout=60
258
)
259
260
# Create request
261
request = HttpRequest("GET", "https://api.example.com/data")
262
request.headers["Accept"] = "application/json"
263
264
# Send request
265
with transport:
266
response = transport.send(request)
267
print(f"Status: {response.status_code}")
268
print(f"Content: {response.text}")
269
```
270
271
### Async Transport Usage
272
273
```python
274
import asyncio
275
from azure.core.pipeline.transport import AioHttpTransport, HttpRequest
276
277
async def async_transport_example():
278
# Create async transport
279
transport = AioHttpTransport()
280
281
# Create request
282
request = HttpRequest("GET", "https://api.example.com/data")
283
284
# Send request
285
async with transport:
286
response = await transport.send(request)
287
content = await response.text()
288
print(f"Status: {response.status_code}")
289
print(f"Content: {content}")
290
291
# asyncio.run(async_transport_example())
292
```
293
294
### Custom Transport Configuration
295
296
```python
297
import requests
298
from azure.core.pipeline.transport import RequestsTransport
299
300
# Create custom session with specific configuration
301
session = requests.Session()
302
session.verify = "/path/to/ca-bundle.pem" # Custom CA bundle
303
session.cert = ("/path/to/client.cert", "/path/to/client.key") # Client cert
304
session.proxies = {
305
"http": "http://proxy.example.com:8080",
306
"https": "https://proxy.example.com:8080"
307
}
308
309
# Create transport with custom session
310
transport = RequestsTransport(
311
session=session,
312
session_owner=False, # Don't close session automatically
313
connection_timeout=60,
314
read_timeout=120
315
)
316
```
317
318
### Stream Response Handling
319
320
```python
321
from azure.core.pipeline.transport import HttpRequest
322
323
def download_large_file(transport, url, filename):
324
request = HttpRequest("GET", url)
325
326
with transport:
327
response = transport.send(request, stream=True)
328
329
if response.status_code == 200:
330
with open(filename, "wb") as f:
331
for chunk in response.iter_bytes(chunk_size=8192):
332
f.write(chunk)
333
else:
334
print(f"Download failed: {response.status_code}")
335
```
336
337
### JSON Request and Response
338
339
```python
340
import json
341
from azure.core.pipeline.transport import HttpRequest
342
343
def send_json_request(transport, url, data):
344
request = HttpRequest("POST", url)
345
request.headers["Content-Type"] = "application/json"
346
request.set_json_body(data)
347
348
with transport:
349
response = transport.send(request)
350
351
if response.status_code == 200:
352
return response.json()
353
else:
354
raise Exception(f"Request failed: {response.status_code}")
355
356
# Example usage
357
data = {"name": "example", "value": 42}
358
result = send_json_request(transport, "https://api.example.com/items", data)
359
```
360
361
### Multipart Request
362
363
```python
364
from azure.core.pipeline.transport import HttpRequest
365
366
def upload_file_multipart(transport, url, file_path):
367
with open(file_path, "rb") as f:
368
files = {"file": f}
369
370
request = HttpRequest("POST", url, files=files)
371
372
with transport:
373
response = transport.send(request)
374
return response.status_code == 200
375
```
376
377
### Async Stream Processing
378
379
```python
380
import asyncio
381
from azure.core.pipeline.transport import AioHttpTransport, HttpRequest
382
383
async def async_stream_download(url, filename):
384
transport = AioHttpTransport()
385
request = HttpRequest("GET", url)
386
387
async with transport:
388
response = await transport.send(request)
389
390
if response.status_code == 200:
391
with open(filename, "wb") as f:
392
async for chunk in response.iter_bytes(chunk_size=8192):
393
f.write(chunk)
394
else:
395
print(f"Download failed: {response.status_code}")
396
397
# asyncio.run(async_stream_download("https://example.com/file.zip", "file.zip"))
398
```
399
400
## Transport Selection and Configuration
401
402
### Automatic Transport Selection
403
404
Azure Core automatically selects appropriate transport based on available libraries:
405
406
```python
407
from azure.core.pipeline.transport import RequestsTransport
408
from azure.core import PipelineClient
409
410
# Azure Core will automatically choose the best available transport
411
client = PipelineClient(base_url="https://api.example.com")
412
413
# Or explicitly specify transport
414
transport = RequestsTransport()
415
client = PipelineClient(base_url="https://api.example.com", transport=transport)
416
```
417
418
### Connection Pool Configuration
419
420
```python
421
import requests
422
from azure.core.pipeline.transport import RequestsTransport
423
424
# Configure connection pooling
425
session = requests.Session()
426
adapter = requests.adapters.HTTPAdapter(
427
pool_connections=20, # Number of connection pools
428
pool_maxsize=20, # Maximum connections per pool
429
max_retries=3
430
)
431
session.mount("http://", adapter)
432
session.mount("https://", adapter)
433
434
transport = RequestsTransport(session=session, session_owner=False)
435
```
436
437
### SSL and Certificate Configuration
438
439
```python
440
import ssl
441
import aiohttp
442
from azure.core.pipeline.transport import AioHttpTransport
443
444
# Create SSL context
445
ssl_context = ssl.create_default_context()
446
ssl_context.check_hostname = False
447
ssl_context.verify_mode = ssl.CERT_NONE
448
449
# Configure aiohttp with custom SSL
450
connector = aiohttp.TCPConnector(ssl=ssl_context)
451
session = aiohttp.ClientSession(connector=connector)
452
453
transport = AioHttpTransport(session=session, session_owner=False)
454
```
455
456
### Advanced Connection Management
457
458
Configuration classes and methods for managing connection lifecycle, pooling, and advanced settings.
459
460
```python { .api }
461
from azure.core.configuration import ConnectionConfiguration
462
from azure.core.pipeline.transport._bigger_block_size_http_adapters import BiggerBlockSizeHTTPAdapter
463
from typing import Union, Optional, Any
464
465
class ConnectionConfiguration:
466
"""HTTP transport connection configuration settings."""
467
def __init__(
468
self,
469
*,
470
connection_timeout: float = 300,
471
read_timeout: float = 300,
472
connection_verify: Union[bool, str] = True,
473
connection_cert: Optional[str] = None,
474
connection_data_block_size: int = 4096,
475
**kwargs: Any
476
) -> None: ...
477
478
class BiggerBlockSizeHTTPAdapter:
479
"""Custom HTTP adapter that optimizes block size for better performance."""
480
def get_connection(self, url, proxies=None): ...
481
```
482
483
#### Usage Examples
484
485
```python
486
import requests
487
from requests.adapters import HTTPAdapter
488
from azure.core.pipeline.transport import RequestsTransport, AioHttpTransport
489
import aiohttp
490
import ssl
491
492
# Advanced connection pooling configuration
493
session = requests.Session()
494
adapter = HTTPAdapter(
495
pool_connections=20, # Number of connection pools
496
pool_maxsize=20, # Max connections per pool
497
max_retries=3
498
)
499
session.mount("http://", adapter)
500
session.mount("https://", adapter)
501
502
transport = RequestsTransport(session=session, session_owner=False)
503
504
# Advanced SSL configuration for aiohttp
505
ssl_context = ssl.create_default_context()
506
ssl_context.load_verify_locations("/path/to/ca-bundle.pem")
507
ssl_context.load_cert_chain("/path/to/client.pem", "/path/to/client.key")
508
509
connector = aiohttp.TCPConnector(
510
ssl=ssl_context,
511
limit=100, # Total connection pool size
512
limit_per_host=20, # Per-host connection limit
513
keepalive_timeout=30 # Keep-alive timeout
514
)
515
session = aiohttp.ClientSession(connector=connector)
516
transport = AioHttpTransport(session=session, session_owner=False)
517
518
# Session ownership patterns
519
# Transport owns session (default) - will close session automatically
520
transport_owned = RequestsTransport(session_owner=True)
521
522
# External session management - must close session manually
523
external_session = requests.Session()
524
transport_external = RequestsTransport(session=external_session, session_owner=False)
525
```
526
527
### Advanced Timeout Configuration
528
529
Extended timeout handling with per-request overrides.
530
531
```python
532
from azure.core.pipeline.transport import RequestsTransport, HttpRequest
533
534
transport = RequestsTransport()
535
request = HttpRequest("GET", "https://api.example.com/data")
536
537
# Per-request timeout override
538
response = transport.send(
539
request,
540
connection_timeout=30.0, # Connection establishment timeout
541
read_timeout=60.0 # Read timeout
542
)
543
544
# Environment settings control
545
transport_with_env = RequestsTransport(use_env_settings=True) # Default
546
transport_no_env = RequestsTransport(use_env_settings=False) # Disable env proxy
547
```
548
549
### Transport-Specific Response Methods
550
551
Advanced response handling methods for different transport implementations.
552
553
```python { .api }
554
from azure.core.pipeline.transport import AsyncHttpResponse, HttpResponse
555
from typing import Iterator, AsyncIterator
556
557
class AsyncHttpResponse:
558
async def load_body(self) -> None:
559
"""Load response body into memory for sync access."""
560
561
def parts(self) -> AsyncIterator["AsyncHttpResponse"]:
562
"""Parse multipart/mixed responses asynchronously."""
563
564
class HttpResponse:
565
def stream_download(self, pipeline, **kwargs) -> Iterator[bytes]:
566
"""Stream download with error handling and decompression control."""
567
568
def __getstate__(self):
569
"""Pickle support - loads body and handles unpicklable objects."""
570
```
571
572
#### Usage Examples
573
574
```python
575
from azure.core.pipeline.transport import AioHttpTransport, HttpRequest
576
577
async def handle_multipart_response():
578
transport = AioHttpTransport()
579
request = HttpRequest("GET", "https://api.example.com/multipart")
580
581
async with transport:
582
response = await transport.send(request)
583
584
# Handle multipart response
585
async for part in response.parts():
586
content = await part.read()
587
print(f"Part content: {len(content)} bytes")
588
589
# Stream download with progress tracking
590
def download_with_progress(transport, url, file_path):
591
request = HttpRequest("GET", url)
592
593
with transport:
594
response = transport.send(request)
595
596
with open(file_path, "wb") as f:
597
total_bytes = 0
598
for chunk in response.stream_download(pipeline=None):
599
f.write(chunk)
600
total_bytes += len(chunk)
601
print(f"Downloaded: {total_bytes} bytes")
602
```
603
604
## Best Practices
605
606
### Transport Management
607
608
- Use context managers (`with` statements) for proper resource cleanup
609
- Share transport instances across multiple requests for connection reuse
610
- Configure appropriate timeouts based on your service requirements
611
- Use async transports with async code for optimal performance
612
613
### Connection Configuration
614
615
- Set reasonable connection and read timeouts
616
- Configure connection pooling for high-throughput scenarios
617
- Use appropriate SSL/TLS settings for security requirements
618
- Configure proxy settings when running behind corporate firewalls
619
620
### Error Handling
621
622
- Handle transport-specific exceptions appropriately
623
- Implement retry logic for transient network errors
624
- Monitor connection pool exhaustion and adjust sizing accordingly
625
- Log transport errors with sufficient detail for debugging
626
627
### Performance Optimization
628
629
- Use streaming for large payloads to avoid memory issues
630
- Implement connection pooling and keep-alive for multiple requests
631
- Choose async transports for I/O bound operations
632
- Monitor and tune timeout settings based on service characteristics