AutoRest swagger generator Python client runtime for REST API clients with serialization, authentication, and request handling.
—
Configurable HTTP request/response pipeline with policy-based architecture supporting custom middleware, authentication injection, logging, and retry logic. The pipeline system provides a flexible way to process HTTP requests and responses through a chain of policies.
Main pipeline class that orchestrates request/response processing through policies.
class Pipeline:
def __init__(self, policies=None, sender=None):
"""
Initialize HTTP pipeline.
Parameters:
- policies: List of HTTPPolicy or SansIOHTTPPolicy objects
- sender: HTTPSender for executing requests (defaults to RequestsHTTPSender)
"""
def run(self, request, **kwargs):
"""
Execute request through pipeline.
Parameters:
- request: HTTP request object
- kwargs: Additional configuration options
Returns:
Response object from pipeline execution
"""
def __enter__(self):
"""Enter context manager."""
def __exit__(self, *exc_details):
"""Exit context manager."""Policy interfaces for processing requests and responses.
class HTTPPolicy:
def __init__(self):
"""Initialize HTTP policy."""
self.next = None # Next policy in chain
def send(self, request, **kwargs):
"""
Process request and call next policy.
Parameters:
- request: Request object to process
- kwargs: Additional configuration
Returns:
Response object
"""
class SansIOHTTPPolicy:
"""
Sans I/O policy for request/response processing.
Can act before and after I/O without being tied to specific HTTP implementation.
"""
def on_request(self, request, **kwargs):
"""
Process request before sending.
Parameters:
- request: Request to process
- kwargs: Additional options
"""
def on_response(self, request, response, **kwargs):
"""
Process response after receiving.
Parameters:
- request: Original request
- response: Received response
- kwargs: Additional options
"""
def on_exception(self, request, **kwargs) -> bool:
"""
Handle exceptions during request processing.
Parameters:
- request: Request that caused exception
- kwargs: Additional context
Returns:
True if exception was handled, False to re-raise
"""Abstract base for HTTP request execution.
class HTTPSender:
def send(self, request, **config):
"""
Send HTTP request.
Parameters:
- request: Request object to send
- config: Configuration overrides
Returns:
Response object
"""
def build_context(self):
"""
Build context object for pipeline.
Returns:
Context object (implementation specific)
"""
def __enter__(self):
"""Enter context manager."""
def __exit__(self, *exc_details):
"""Exit context manager."""Pipeline-specific request and response containers.
class Request:
def __init__(self, http_request, context=None):
"""
Pipeline request wrapper.
Parameters:
- http_request: Underlying HTTP request object
- context: Pipeline context data
"""
http_request: any # Underlying HTTP request
context: any # Pipeline context
class Response:
def __init__(self, request, http_response, context=None):
"""
Pipeline response wrapper.
Parameters:
- request: Original Request object
- http_response: Underlying HTTP response
- context: Pipeline context
"""
request: Request # Original request
http_response: any # Underlying HTTP response
context: dict # Pipeline context dictionaryCommon policies provided by msrest.
class UserAgentPolicy:
"""Policy for managing User-Agent header."""
def __init__(self, user_agent=None):
"""
Initialize User-Agent policy.
Parameters:
- user_agent: Custom user agent string
"""
user_agent: str
def add_user_agent(self, value: str):
"""Add value to user agent string."""
class HTTPLogger:
"""Policy for logging HTTP requests and responses."""
enable_http_logger: bool = True
def __init__(self, enable_http_logger=True):
"""
Initialize HTTP logger policy.
Parameters:
- enable_http_logger: Enable/disable logging
"""
class RawDeserializer:
"""Policy for deserializing raw HTTP responses."""
CONTEXT_NAME: str = "deserialized_data"
@staticmethod
def deserialize_from_text(data, content_type=None):
"""
Deserialize text data.
Parameters:
- data: Text data to deserialize
- content_type: Content type hint
Returns:
Deserialized data
"""
@staticmethod
def deserialize_from_http_generics(text, headers):
"""
Deserialize from HTTP response components.
Parameters:
- text: Response text
- headers: Response headers
Returns:
Deserialized data
"""Async versions of pipeline components.
class AsyncPipeline:
"""Async version of Pipeline."""
def __init__(self, policies=None, sender=None):
"""Initialize async pipeline."""
async def run(self, request, **kwargs):
"""Execute request through async pipeline."""
class AsyncHTTPPolicy:
"""Async HTTP policy interface."""
async def send(self, request, **kwargs):
"""Process request asynchronously."""
class AsyncHTTPSender:
"""Async HTTP sender interface."""
async def send(self, request, **config):
"""Send request asynchronously."""from msrest.pipeline import Pipeline, HTTPPolicy
from msrest import ServiceClient, Configuration
# Create custom policy
class LoggingPolicy(HTTPPolicy):
def send(self, request, **kwargs):
print(f"Sending request to: {request.http_request.url}")
response = self.next.send(request, **kwargs)
print(f"Received response: {response.http_response.status_code}")
return response
# Create pipeline with policies
policies = [LoggingPolicy()]
pipeline = Pipeline(policies)
# Use with service client
config = Configuration(base_url='https://api.example.com')
config.pipeline = pipeline
client = ServiceClient(None, config)from msrest.pipeline import SansIOHTTPPolicy
class CustomAuthPolicy(SansIOHTTPPolicy):
"""Custom authentication policy."""
def __init__(self, api_key):
self.api_key = api_key
def on_request(self, request, **kwargs):
"""Add authentication header to request."""
request.http_request.headers['Authorization'] = f'Bearer {self.api_key}'
def on_response(self, request, response, **kwargs):
"""Handle authentication errors."""
if response.http_response.status_code == 401:
print("Authentication failed - token may be expired")
# Use custom auth policy
auth_policy = CustomAuthPolicy('your-api-key')
policies = [auth_policy]
pipeline = Pipeline(policies)import time
import random
from msrest.pipeline import HTTPPolicy
from msrest.exceptions import HttpOperationError
class RetryPolicy(HTTPPolicy):
"""Simple retry policy with exponential backoff."""
def __init__(self, max_retries=3, base_delay=1):
super(RetryPolicy, self).__init__()
self.max_retries = max_retries
self.base_delay = base_delay
def send(self, request, **kwargs):
"""Send request with retry logic."""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
response = self.next.send(request, **kwargs)
# Check if we should retry based on status code
if response.http_response.status_code >= 500:
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Server error, retrying in {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
continue
return response
except Exception as e:
last_exception = e
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt)
print(f"Request failed, retrying in {delay}s (attempt {attempt + 1})")
time.sleep(delay)
continue
# All retries exhausted
if last_exception:
raise last_exception
else:
return response
# Use retry policy
retry_policy = RetryPolicy(max_retries=3, base_delay=2)
policies = [retry_policy]
pipeline = Pipeline(policies)from msrest.pipeline import SansIOHTTPPolicy
import json
class RequestTransformPolicy(SansIOHTTPPolicy):
"""Transform requests and responses."""
def on_request(self, request, **kwargs):
"""Transform outgoing requests."""
# Add timestamp to all requests
if hasattr(request.http_request, 'data') and request.http_request.data:
try:
data = json.loads(request.http_request.data)
data['timestamp'] = time.time()
request.http_request.data = json.dumps(data)
except (json.JSONDecodeError, TypeError):
pass # Skip transformation for non-JSON data
# Add correlation ID
import uuid
correlation_id = str(uuid.uuid4())
request.http_request.headers['X-Correlation-ID'] = correlation_id
# Store in context for response processing
if not request.context:
request.context = {}
request.context['correlation_id'] = correlation_id
def on_response(self, request, response, **kwargs):
"""Transform incoming responses."""
# Log correlation
correlation_id = request.context.get('correlation_id')
if correlation_id:
print(f"Response for correlation ID {correlation_id}")
# Add custom header to context
if hasattr(response.http_response, 'headers'):
response.context['server_time'] = response.http_response.headers.get('Date')
# Use transformation policy
transform_policy = RequestTransformPolicy()
policies = [transform_policy]
pipeline = Pipeline(policies)from msrest.pipeline import Pipeline
from msrest.pipeline.universal import UserAgentPolicy, HTTPLogger
# Create multiple policies
user_agent_policy = UserAgentPolicy()
user_agent_policy.add_user_agent('MyApp/1.0')
http_logger = HTTPLogger(enable_http_logger=True)
class MetricsPolicy(SansIOHTTPPolicy):
"""Collect request metrics."""
def __init__(self):
self.request_count = 0
self.response_times = []
def on_request(self, request, **kwargs):
self.request_count += 1
request.context['start_time'] = time.time()
def on_response(self, request, response, **kwargs):
if 'start_time' in request.context:
duration = time.time() - request.context['start_time']
self.response_times.append(duration)
print(f"Request took {duration:.3f}s")
metrics_policy = MetricsPolicy()
# Combine policies (order matters)
policies = [
user_agent_policy, # Set user agent first
metrics_policy, # Collect metrics
retry_policy, # Retry on failures
http_logger # Log requests (usually last)
]
pipeline = Pipeline(policies)
# Use pipeline
config = Configuration(base_url='https://api.example.com')
config.pipeline = pipeline
with ServiceClient(None, config) as client:
# Make multiple requests
for i in range(5):
request = client.get(f'/data/{i}')
response = client.send(request)
# Check metrics
print(f"Total requests: {metrics_policy.request_count}")
print(f"Average response time: {sum(metrics_policy.response_times) / len(metrics_policy.response_times):.3f}s")import asyncio
from msrest.pipeline import AsyncPipeline, AsyncHTTPPolicy
class AsyncLoggingPolicy(AsyncHTTPPolicy):
"""Async logging policy."""
async def send(self, request, **kwargs):
print(f"[ASYNC] Sending request to: {request.http_request.url}")
response = await self.next.send(request, **kwargs)
print(f"[ASYNC] Received response: {response.http_response.status_code}")
return response
# Create async pipeline
async_policies = [AsyncLoggingPolicy()]
async_pipeline = AsyncPipeline(async_policies)
# Use with async client (pseudo-code)
async def async_example():
async_client = await create_async_client()
async_client.config.pipeline = async_pipeline
request = async_client.get('/async-data')
response = await async_client.send(request)
return response
# Run async pipeline
result = asyncio.run(async_example())from msrest.pipeline import SansIOHTTPPolicy
class ContextPolicy(SansIOHTTPPolicy):
"""Policy demonstrating context usage."""
def on_request(self, request, **kwargs):
"""Add data to request context."""
if not request.context:
request.context = {}
# Add request metadata
request.context.update({
'request_id': str(uuid.uuid4()),
'start_time': time.time(),
'user_data': kwargs.get('user_data', {})
})
def on_response(self, request, response, **kwargs):
"""Process context data in response."""
# Calculate request duration
if 'start_time' in request.context:
duration = time.time() - request.context['start_time']
response.context['request_duration'] = duration
# Copy request ID to response
if 'request_id' in request.context:
response.context['request_id'] = request.context['request_id']
# Use context policy
context_policy = ContextPolicy()
pipeline = Pipeline([context_policy])
# Send request with context data
with ServiceClient(None, config) as client:
request = client.get('/data')
response = client.send(request, user_data={'session_id': '12345'})
# Access response context
print(f"Request duration: {response.context.get('request_duration', 'unknown')}")
print(f"Request ID: {response.context.get('request_id', 'unknown')}")from msrest.pipeline import HTTPPolicy
from msrest.exceptions import ClientException
class ErrorHandlingPolicy(HTTPPolicy):
"""Policy with comprehensive error handling."""
def send(self, request, **kwargs):
try:
response = self.next.send(request, **kwargs)
# Check for client errors
if 400 <= response.http_response.status_code < 500:
# Handle client errors
self._handle_client_error(request, response)
# Check for server errors
elif response.http_response.status_code >= 500:
# Handle server errors
self._handle_server_error(request, response)
return response
except Exception as e:
# Handle network/connection errors
self._handle_network_error(request, e)
raise
def _handle_client_error(self, request, response):
"""Handle 4xx client errors."""
status = response.http_response.status_code
if status == 401:
print("Authentication required")
elif status == 403:
print("Access forbidden")
elif status == 404:
print("Resource not found")
elif status == 429:
print("Rate limit exceeded")
def _handle_server_error(self, request, response):
"""Handle 5xx server errors."""
print(f"Server error: {response.http_response.status_code}")
def _handle_network_error(self, request, exception):
"""Handle network/connection errors."""
print(f"Network error: {type(exception).__name__}: {exception}")
# Use error handling policy
error_policy = ErrorHandlingPolicy()
pipeline = Pipeline([error_policy])class ClientRawResponse:
"""
Wrapper for response with additional data.
Attributes:
- output: Deserialized response object
- response: Raw HTTP response
- headers: Dict of deserialized headers
"""
def __init__(self, output, response):
"""Initialize raw response wrapper."""
def add_headers(self, header_dict: dict):
"""
Deserialize specific headers.
Parameters:
- header_dict: Dict mapping header names to types
"""Install with Tessl CLI
npx tessl i tessl/pypi-msrest