0
# HTTP Adapters
1
2
Specialized HTTPAdapter implementations that modify connection behavior for SSL protocol selection, source address binding, certificate verification, fingerprint validation, and socket configuration.
3
4
## Capabilities
5
6
### SSL Protocol Adapter
7
8
Select specific SSL/TLS protocol versions for connections.
9
10
```python { .api }
11
class SSLAdapter:
12
"""
13
HTTPAdapter for selecting SSL/TLS protocol version.
14
15
Parameters:
16
- ssl_version: SSL protocol version (ssl.PROTOCOL_* constants, optional)
17
"""
18
def __init__(self, ssl_version=None, **kwargs): ...
19
```
20
21
#### Usage Examples
22
23
```python
24
import requests
25
import ssl
26
from requests_toolbelt import SSLAdapter
27
28
session = requests.Session()
29
30
# Force TLS v1.2
31
session.mount('https://', SSLAdapter(ssl.PROTOCOL_TLSv1_2))
32
33
response = session.get('https://api.example.com/data')
34
35
# Use with specific endpoint
36
session.mount('https://secure-api.com', SSLAdapter(ssl.PROTOCOL_TLSv1_2))
37
```
38
39
### Source Address Adapter
40
41
Bind connections to a specific source IP address.
42
43
```python { .api }
44
class SourceAddressAdapter:
45
"""
46
HTTPAdapter for binding to specific source address.
47
48
Parameters:
49
- source_address: tuple of (ip_address, port) or just ip_address string
50
"""
51
def __init__(self, source_address): ...
52
```
53
54
#### Usage Examples
55
56
```python
57
import requests
58
from requests_toolbelt import SourceAddressAdapter
59
60
session = requests.Session()
61
62
# Bind to specific IP (port chosen automatically)
63
session.mount('http://', SourceAddressAdapter('192.168.1.100'))
64
session.mount('https://', SourceAddressAdapter('192.168.1.100'))
65
66
# Bind to specific IP and port
67
session.mount('http://', SourceAddressAdapter(('192.168.1.100', 0)))
68
69
response = session.get('https://httpbin.org/ip')
70
```
71
72
### X.509 Client Certificate Adapter
73
74
Use client certificates for authentication with automatic certificate validation.
75
76
```python { .api }
77
class X509Adapter:
78
"""
79
HTTPAdapter for X.509 client certificate authentication.
80
81
Parameters:
82
- cert_bytes: bytes, certificate data (keyword argument)
83
- pk_bytes: bytes, private key data (keyword argument)
84
- password: bytes or str, private key password (optional)
85
- encoding: encoding format, default PEM encoding
86
"""
87
def __init__(self, **kwargs): ...
88
89
def check_cert_dates(cert):
90
"""
91
Verify that the supplied client cert is not invalid.
92
93
Parameters:
94
- cert: cryptography.x509.Certificate object
95
96
Raises:
97
ValueError: If certificate is expired (before not_valid_before or after not_valid_after)
98
"""
99
100
def create_ssl_context(cert_byes, pk_bytes, password=None, encoding=Encoding.PEM):
101
"""
102
Create an SSL Context with the supplied cert/password.
103
104
Note: Function has typo in parameter name - 'cert_byes' instead of 'cert_bytes'
105
106
Parameters:
107
- cert_byes: bytes, certificate data encoded using specified encoding
108
- pk_bytes: bytes, private key data
109
- password: bytes, private key passphrase (optional)
110
- encoding: cryptography.hazmat.primitives.serialization.Encoding (default: Encoding.PEM)
111
112
Returns:
113
PyOpenSSLContext: SSL context object
114
115
Raises:
116
ValueError: If encoding is invalid or cert/key cannot be parsed
117
"""
118
```
119
120
#### Usage Examples
121
122
```python
123
import requests
124
from requests_toolbelt.adapters.x509 import X509Adapter
125
126
# Load certificate and key
127
with open('client.crt', 'rb') as f:
128
cert_data = f.read()
129
130
with open('client.key', 'rb') as f:
131
key_data = f.read()
132
133
session = requests.Session()
134
session.mount('https://', X509Adapter(cert_bytes=cert_data, pk_bytes=key_data))
135
136
response = session.get('https://secure-api.example.com/data')
137
138
# With password-protected key
139
adapter = X509Adapter(cert_bytes=cert_data, pk_bytes=key_data, password='keypassword')
140
session.mount('https://', adapter)
141
```
142
143
### SSL Fingerprint Verification
144
145
Verify SSL certificates by fingerprint to prevent man-in-the-middle attacks.
146
147
```python { .api }
148
class FingerprintAdapter:
149
"""
150
HTTPAdapter for SSL certificate fingerprint verification.
151
152
Parameters:
153
- fingerprint: str, expected certificate fingerprint (hex)
154
"""
155
def __init__(self, fingerprint): ...
156
```
157
158
#### Usage Examples
159
160
```python
161
import requests
162
from requests_toolbelt.adapters.fingerprint import FingerprintAdapter
163
164
# SHA-256 fingerprint verification
165
fingerprint = 'AA:BB:CC:DD:EE:FF:00:11:22:33:44:55:66:77:88:99:AA:BB:CC:DD:EE:FF:00:11:22:33:44:55:66:77:88:99'
166
167
session = requests.Session()
168
session.mount('https://api.example.com', FingerprintAdapter(fingerprint))
169
170
response = session.get('https://api.example.com/data')
171
```
172
173
### Host Header SSL Adapter
174
175
Send custom Host header with SSL connections for specific hosting scenarios.
176
177
```python { .api }
178
class HostHeaderSSLAdapter(HTTPAdapter):
179
"""
180
A HTTPS Adapter that sets the hostname for certificate verification based on the Host header.
181
182
This allows requesting an IP address directly via HTTPS without getting
183
a "hostname doesn't match" exception by reading the Host header from the request.
184
185
Parameters:
186
- **kwargs: HTTPAdapter parameters (no special host parameter needed)
187
"""
188
def __init__(self, **kwargs): ...
189
190
def send(self, request, **kwargs):
191
"""Send request with Host header-based hostname verification."""
192
```
193
194
#### Usage Examples
195
196
```python
197
import requests
198
from requests_toolbelt.adapters.host_header_ssl import HostHeaderSSLAdapter
199
200
session = requests.Session()
201
session.mount('https://', HostHeaderSSLAdapter())
202
203
# Request IP address directly with Host header for certificate verification
204
response = session.get(
205
"https://93.184.216.34",
206
headers={"Host": "example.org"}
207
)
208
209
# Works with any IP/hostname mismatch scenario
210
response = session.get(
211
"https://127.0.0.1:8443",
212
headers={"Host": "localhost.example.com"}
213
)
214
```
215
216
### Socket Options Adapter
217
218
Configure socket-level options for fine-grained connection control.
219
220
```python { .api }
221
class SocketOptionsAdapter:
222
"""
223
HTTPAdapter for configuring socket options.
224
225
Parameters:
226
- socket_options: list of (level, optname, value) tuples
227
"""
228
def __init__(self, socket_options): ...
229
230
class TCPKeepAliveAdapter:
231
"""
232
HTTPAdapter that enables TCP keep-alive.
233
234
Parameters:
235
- idle: int, seconds before sending keep-alive probes (default: 60)
236
- interval: int, interval between keep-alive probes (default: 20)
237
- count: int, number of failed probes before declaring dead (default: 5)
238
"""
239
def __init__(self, **kwargs): ...
240
```
241
242
#### Usage Examples
243
244
```python
245
import requests
246
import socket
247
from requests_toolbelt.adapters.socket_options import SocketOptionsAdapter, TCPKeepAliveAdapter
248
249
# Custom socket options
250
socket_options = [
251
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
252
(socket.SOL_TCP, socket.TCP_KEEPIDLE, 600),
253
]
254
255
session = requests.Session()
256
session.mount('http://', SocketOptionsAdapter(socket_options))
257
session.mount('https://', SocketOptionsAdapter(socket_options))
258
259
# TCP keep-alive with defaults
260
session = requests.Session()
261
adapter = TCPKeepAliveAdapter()
262
session.mount('http://', adapter)
263
session.mount('https://', adapter)
264
265
# Custom keep-alive settings
266
adapter = TCPKeepAliveAdapter(idle=300, interval=30, count=3)
267
session.mount('https://long-running-api.com', adapter)
268
269
response = session.get('https://long-running-api.com/stream')
270
```
271
272
### Adapter Usage Patterns
273
274
```python
275
# Multiple adapters for different endpoints
276
session = requests.Session()
277
278
# Different SSL versions for different sites
279
session.mount('https://old-api.com', SSLAdapter(ssl.PROTOCOL_TLSv1))
280
session.mount('https://new-api.com', SSLAdapter(ssl.PROTOCOL_TLSv1_2))
281
282
# Source address binding for specific routes
283
session.mount('https://internal-api.company.com', SourceAddressAdapter('10.0.1.100'))
284
285
# Client certificates for authentication
286
with open('client.crt', 'rb') as f:
287
cert_data = f.read()
288
with open('client.key', 'rb') as f:
289
key_data = f.read()
290
291
session.mount('https://secure-api.company.com', X509Adapter(cert_data, key_data))
292
293
# Fingerprint verification for critical endpoints
294
fingerprint = 'AA:BB:CC:DD:EE:FF:00:11:22:33:44:55:66:77:88:99:AA:BB:CC:DD:EE:FF:00:11:22:33:44:55:66:77:88:99'
295
session.mount('https://critical-api.company.com', FingerprintAdapter(fingerprint))
296
```