0
# Pipeline System
1
2
Configurable HTTP request/response pipeline with policy-based architecture supporting custom middleware, authentication injection, logging, and retry logic. The pipeline system provides a flexible way to process HTTP requests and responses through a chain of policies.
3
4
## Capabilities
5
6
### Pipeline
7
8
Main pipeline class that orchestrates request/response processing through policies.
9
10
```python { .api }
11
class Pipeline:
12
def __init__(self, policies=None, sender=None):
13
"""
14
Initialize HTTP pipeline.
15
16
Parameters:
17
- policies: List of HTTPPolicy or SansIOHTTPPolicy objects
18
- sender: HTTPSender for executing requests (defaults to RequestsHTTPSender)
19
"""
20
21
def run(self, request, **kwargs):
22
"""
23
Execute request through pipeline.
24
25
Parameters:
26
- request: HTTP request object
27
- kwargs: Additional configuration options
28
29
Returns:
30
Response object from pipeline execution
31
"""
32
33
def __enter__(self):
34
"""Enter context manager."""
35
36
def __exit__(self, *exc_details):
37
"""Exit context manager."""
38
```
39
40
### HTTP Policies
41
42
Policy interfaces for processing requests and responses.
43
44
```python { .api }
45
class HTTPPolicy:
46
def __init__(self):
47
"""Initialize HTTP policy."""
48
self.next = None # Next policy in chain
49
50
def send(self, request, **kwargs):
51
"""
52
Process request and call next policy.
53
54
Parameters:
55
- request: Request object to process
56
- kwargs: Additional configuration
57
58
Returns:
59
Response object
60
"""
61
62
class SansIOHTTPPolicy:
63
"""
64
Sans I/O policy for request/response processing.
65
Can act before and after I/O without being tied to specific HTTP implementation.
66
"""
67
68
def on_request(self, request, **kwargs):
69
"""
70
Process request before sending.
71
72
Parameters:
73
- request: Request to process
74
- kwargs: Additional options
75
"""
76
77
def on_response(self, request, response, **kwargs):
78
"""
79
Process response after receiving.
80
81
Parameters:
82
- request: Original request
83
- response: Received response
84
- kwargs: Additional options
85
"""
86
87
def on_exception(self, request, **kwargs) -> bool:
88
"""
89
Handle exceptions during request processing.
90
91
Parameters:
92
- request: Request that caused exception
93
- kwargs: Additional context
94
95
Returns:
96
True if exception was handled, False to re-raise
97
"""
98
```
99
100
### HTTP Sender
101
102
Abstract base for HTTP request execution.
103
104
```python { .api }
105
class HTTPSender:
106
def send(self, request, **config):
107
"""
108
Send HTTP request.
109
110
Parameters:
111
- request: Request object to send
112
- config: Configuration overrides
113
114
Returns:
115
Response object
116
"""
117
118
def build_context(self):
119
"""
120
Build context object for pipeline.
121
122
Returns:
123
Context object (implementation specific)
124
"""
125
126
def __enter__(self):
127
"""Enter context manager."""
128
129
def __exit__(self, *exc_details):
130
"""Exit context manager."""
131
```
132
133
### Request and Response Wrappers
134
135
Pipeline-specific request and response containers.
136
137
```python { .api }
138
class Request:
139
def __init__(self, http_request, context=None):
140
"""
141
Pipeline request wrapper.
142
143
Parameters:
144
- http_request: Underlying HTTP request object
145
- context: Pipeline context data
146
"""
147
148
http_request: any # Underlying HTTP request
149
context: any # Pipeline context
150
151
class Response:
152
def __init__(self, request, http_response, context=None):
153
"""
154
Pipeline response wrapper.
155
156
Parameters:
157
- request: Original Request object
158
- http_response: Underlying HTTP response
159
- context: Pipeline context
160
"""
161
162
request: Request # Original request
163
http_response: any # Underlying HTTP response
164
context: dict # Pipeline context dictionary
165
```
166
167
### Built-in Policies
168
169
Common policies provided by msrest.
170
171
```python { .api }
172
class UserAgentPolicy:
173
"""Policy for managing User-Agent header."""
174
175
def __init__(self, user_agent=None):
176
"""
177
Initialize User-Agent policy.
178
179
Parameters:
180
- user_agent: Custom user agent string
181
"""
182
183
user_agent: str
184
185
def add_user_agent(self, value: str):
186
"""Add value to user agent string."""
187
188
class HTTPLogger:
189
"""Policy for logging HTTP requests and responses."""
190
191
enable_http_logger: bool = True
192
193
def __init__(self, enable_http_logger=True):
194
"""
195
Initialize HTTP logger policy.
196
197
Parameters:
198
- enable_http_logger: Enable/disable logging
199
"""
200
201
class RawDeserializer:
202
"""Policy for deserializing raw HTTP responses."""
203
204
CONTEXT_NAME: str = "deserialized_data"
205
206
@staticmethod
207
def deserialize_from_text(data, content_type=None):
208
"""
209
Deserialize text data.
210
211
Parameters:
212
- data: Text data to deserialize
213
- content_type: Content type hint
214
215
Returns:
216
Deserialized data
217
"""
218
219
@staticmethod
220
def deserialize_from_http_generics(text, headers):
221
"""
222
Deserialize from HTTP response components.
223
224
Parameters:
225
- text: Response text
226
- headers: Response headers
227
228
Returns:
229
Deserialized data
230
"""
231
```
232
233
### Async Pipeline (Python 3.5+)
234
235
Async versions of pipeline components.
236
237
```python { .api }
238
class AsyncPipeline:
239
"""Async version of Pipeline."""
240
241
def __init__(self, policies=None, sender=None):
242
"""Initialize async pipeline."""
243
244
async def run(self, request, **kwargs):
245
"""Execute request through async pipeline."""
246
247
class AsyncHTTPPolicy:
248
"""Async HTTP policy interface."""
249
250
async def send(self, request, **kwargs):
251
"""Process request asynchronously."""
252
253
class AsyncHTTPSender:
254
"""Async HTTP sender interface."""
255
256
async def send(self, request, **config):
257
"""Send request asynchronously."""
258
```
259
260
## Usage Examples
261
262
### Basic Pipeline Setup
263
264
```python
265
from msrest.pipeline import Pipeline, HTTPPolicy
266
from msrest import ServiceClient, Configuration
267
268
# Create custom policy
269
class LoggingPolicy(HTTPPolicy):
270
def send(self, request, **kwargs):
271
print(f"Sending request to: {request.http_request.url}")
272
response = self.next.send(request, **kwargs)
273
print(f"Received response: {response.http_response.status_code}")
274
return response
275
276
# Create pipeline with policies
277
policies = [LoggingPolicy()]
278
pipeline = Pipeline(policies)
279
280
# Use with service client
281
config = Configuration(base_url='https://api.example.com')
282
config.pipeline = pipeline
283
284
client = ServiceClient(None, config)
285
```
286
287
### Custom Authentication Policy
288
289
```python
290
from msrest.pipeline import SansIOHTTPPolicy
291
292
class CustomAuthPolicy(SansIOHTTPPolicy):
293
"""Custom authentication policy."""
294
295
def __init__(self, api_key):
296
self.api_key = api_key
297
298
def on_request(self, request, **kwargs):
299
"""Add authentication header to request."""
300
request.http_request.headers['Authorization'] = f'Bearer {self.api_key}'
301
302
def on_response(self, request, response, **kwargs):
303
"""Handle authentication errors."""
304
if response.http_response.status_code == 401:
305
print("Authentication failed - token may be expired")
306
307
# Use custom auth policy
308
auth_policy = CustomAuthPolicy('your-api-key')
309
policies = [auth_policy]
310
pipeline = Pipeline(policies)
311
```
312
313
### Retry Policy
314
315
```python
316
import time
317
import random
318
from msrest.pipeline import HTTPPolicy
319
from msrest.exceptions import HttpOperationError
320
321
class RetryPolicy(HTTPPolicy):
322
"""Simple retry policy with exponential backoff."""
323
324
def __init__(self, max_retries=3, base_delay=1):
325
super(RetryPolicy, self).__init__()
326
self.max_retries = max_retries
327
self.base_delay = base_delay
328
329
def send(self, request, **kwargs):
330
"""Send request with retry logic."""
331
last_exception = None
332
333
for attempt in range(self.max_retries + 1):
334
try:
335
response = self.next.send(request, **kwargs)
336
337
# Check if we should retry based on status code
338
if response.http_response.status_code >= 500:
339
if attempt < self.max_retries:
340
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
341
print(f"Server error, retrying in {delay:.1f}s (attempt {attempt + 1})")
342
time.sleep(delay)
343
continue
344
345
return response
346
347
except Exception as e:
348
last_exception = e
349
if attempt < self.max_retries:
350
delay = self.base_delay * (2 ** attempt)
351
print(f"Request failed, retrying in {delay}s (attempt {attempt + 1})")
352
time.sleep(delay)
353
continue
354
355
# All retries exhausted
356
if last_exception:
357
raise last_exception
358
else:
359
return response
360
361
# Use retry policy
362
retry_policy = RetryPolicy(max_retries=3, base_delay=2)
363
policies = [retry_policy]
364
pipeline = Pipeline(policies)
365
```
366
367
### Request/Response Transformation
368
369
```python
370
from msrest.pipeline import SansIOHTTPPolicy
371
import json
372
373
class RequestTransformPolicy(SansIOHTTPPolicy):
374
"""Transform requests and responses."""
375
376
def on_request(self, request, **kwargs):
377
"""Transform outgoing requests."""
378
# Add timestamp to all requests
379
if hasattr(request.http_request, 'data') and request.http_request.data:
380
try:
381
data = json.loads(request.http_request.data)
382
data['timestamp'] = time.time()
383
request.http_request.data = json.dumps(data)
384
except (json.JSONDecodeError, TypeError):
385
pass # Skip transformation for non-JSON data
386
387
# Add correlation ID
388
import uuid
389
correlation_id = str(uuid.uuid4())
390
request.http_request.headers['X-Correlation-ID'] = correlation_id
391
392
# Store in context for response processing
393
if not request.context:
394
request.context = {}
395
request.context['correlation_id'] = correlation_id
396
397
def on_response(self, request, response, **kwargs):
398
"""Transform incoming responses."""
399
# Log correlation
400
correlation_id = request.context.get('correlation_id')
401
if correlation_id:
402
print(f"Response for correlation ID {correlation_id}")
403
404
# Add custom header to context
405
if hasattr(response.http_response, 'headers'):
406
response.context['server_time'] = response.http_response.headers.get('Date')
407
408
# Use transformation policy
409
transform_policy = RequestTransformPolicy()
410
policies = [transform_policy]
411
pipeline = Pipeline(policies)
412
```
413
414
### Pipeline with Multiple Policies
415
416
```python
417
from msrest.pipeline import Pipeline
418
from msrest.pipeline.universal import UserAgentPolicy, HTTPLogger
419
420
# Create multiple policies
421
user_agent_policy = UserAgentPolicy()
422
user_agent_policy.add_user_agent('MyApp/1.0')
423
424
http_logger = HTTPLogger(enable_http_logger=True)
425
426
class MetricsPolicy(SansIOHTTPPolicy):
427
"""Collect request metrics."""
428
429
def __init__(self):
430
self.request_count = 0
431
self.response_times = []
432
433
def on_request(self, request, **kwargs):
434
self.request_count += 1
435
request.context['start_time'] = time.time()
436
437
def on_response(self, request, response, **kwargs):
438
if 'start_time' in request.context:
439
duration = time.time() - request.context['start_time']
440
self.response_times.append(duration)
441
print(f"Request took {duration:.3f}s")
442
443
metrics_policy = MetricsPolicy()
444
445
# Combine policies (order matters)
446
policies = [
447
user_agent_policy, # Set user agent first
448
metrics_policy, # Collect metrics
449
retry_policy, # Retry on failures
450
http_logger # Log requests (usually last)
451
]
452
453
pipeline = Pipeline(policies)
454
455
# Use pipeline
456
config = Configuration(base_url='https://api.example.com')
457
config.pipeline = pipeline
458
459
with ServiceClient(None, config) as client:
460
# Make multiple requests
461
for i in range(5):
462
request = client.get(f'/data/{i}')
463
response = client.send(request)
464
465
# Check metrics
466
print(f"Total requests: {metrics_policy.request_count}")
467
print(f"Average response time: {sum(metrics_policy.response_times) / len(metrics_policy.response_times):.3f}s")
468
```
469
470
### Async Pipeline Usage
471
472
```python
473
import asyncio
474
from msrest.pipeline import AsyncPipeline, AsyncHTTPPolicy
475
476
class AsyncLoggingPolicy(AsyncHTTPPolicy):
477
"""Async logging policy."""
478
479
async def send(self, request, **kwargs):
480
print(f"[ASYNC] Sending request to: {request.http_request.url}")
481
response = await self.next.send(request, **kwargs)
482
print(f"[ASYNC] Received response: {response.http_response.status_code}")
483
return response
484
485
# Create async pipeline
486
async_policies = [AsyncLoggingPolicy()]
487
async_pipeline = AsyncPipeline(async_policies)
488
489
# Use with async client (pseudo-code)
490
async def async_example():
491
async_client = await create_async_client()
492
async_client.config.pipeline = async_pipeline
493
494
request = async_client.get('/async-data')
495
response = await async_client.send(request)
496
497
return response
498
499
# Run async pipeline
500
result = asyncio.run(async_example())
501
```
502
503
### Pipeline Context Usage
504
505
```python
506
from msrest.pipeline import SansIOHTTPPolicy
507
508
class ContextPolicy(SansIOHTTPPolicy):
509
"""Policy demonstrating context usage."""
510
511
def on_request(self, request, **kwargs):
512
"""Add data to request context."""
513
if not request.context:
514
request.context = {}
515
516
# Add request metadata
517
request.context.update({
518
'request_id': str(uuid.uuid4()),
519
'start_time': time.time(),
520
'user_data': kwargs.get('user_data', {})
521
})
522
523
def on_response(self, request, response, **kwargs):
524
"""Process context data in response."""
525
# Calculate request duration
526
if 'start_time' in request.context:
527
duration = time.time() - request.context['start_time']
528
response.context['request_duration'] = duration
529
530
# Copy request ID to response
531
if 'request_id' in request.context:
532
response.context['request_id'] = request.context['request_id']
533
534
# Use context policy
535
context_policy = ContextPolicy()
536
pipeline = Pipeline([context_policy])
537
538
# Send request with context data
539
with ServiceClient(None, config) as client:
540
request = client.get('/data')
541
response = client.send(request, user_data={'session_id': '12345'})
542
543
# Access response context
544
print(f"Request duration: {response.context.get('request_duration', 'unknown')}")
545
print(f"Request ID: {response.context.get('request_id', 'unknown')}")
546
```
547
548
### Error Handling in Policies
549
550
```python
551
from msrest.pipeline import HTTPPolicy
552
from msrest.exceptions import ClientException
553
554
class ErrorHandlingPolicy(HTTPPolicy):
555
"""Policy with comprehensive error handling."""
556
557
def send(self, request, **kwargs):
558
try:
559
response = self.next.send(request, **kwargs)
560
561
# Check for client errors
562
if 400 <= response.http_response.status_code < 500:
563
# Handle client errors
564
self._handle_client_error(request, response)
565
566
# Check for server errors
567
elif response.http_response.status_code >= 500:
568
# Handle server errors
569
self._handle_server_error(request, response)
570
571
return response
572
573
except Exception as e:
574
# Handle network/connection errors
575
self._handle_network_error(request, e)
576
raise
577
578
def _handle_client_error(self, request, response):
579
"""Handle 4xx client errors."""
580
status = response.http_response.status_code
581
if status == 401:
582
print("Authentication required")
583
elif status == 403:
584
print("Access forbidden")
585
elif status == 404:
586
print("Resource not found")
587
elif status == 429:
588
print("Rate limit exceeded")
589
590
def _handle_server_error(self, request, response):
591
"""Handle 5xx server errors."""
592
print(f"Server error: {response.http_response.status_code}")
593
594
def _handle_network_error(self, request, exception):
595
"""Handle network/connection errors."""
596
print(f"Network error: {type(exception).__name__}: {exception}")
597
598
# Use error handling policy
599
error_policy = ErrorHandlingPolicy()
600
pipeline = Pipeline([error_policy])
601
```
602
603
## Types
604
605
```python { .api }
606
class ClientRawResponse:
607
"""
608
Wrapper for response with additional data.
609
610
Attributes:
611
- output: Deserialized response object
612
- response: Raw HTTP response
613
- headers: Dict of deserialized headers
614
"""
615
616
def __init__(self, output, response):
617
"""Initialize raw response wrapper."""
618
619
def add_headers(self, header_dict: dict):
620
"""
621
Deserialize specific headers.
622
623
Parameters:
624
- header_dict: Dict mapping header names to types
625
"""
626
```