CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-msrest

AutoRest swagger generator Python client runtime for REST API clients with serialization, authentication, and request handling.

Pending
Overview
Eval results
Files

pipeline.mddocs/

Pipeline System

Configurable HTTP request/response pipeline with policy-based architecture supporting custom middleware, authentication injection, logging, and retry logic. The pipeline system provides a flexible way to process HTTP requests and responses through a chain of policies.

Capabilities

Pipeline

Main pipeline class that orchestrates request/response processing through policies.

class Pipeline:
    def __init__(self, policies=None, sender=None):
        """
        Initialize HTTP pipeline.
        
        Parameters:
        - policies: List of HTTPPolicy or SansIOHTTPPolicy objects
        - sender: HTTPSender for executing requests (defaults to RequestsHTTPSender)
        """
    
    def run(self, request, **kwargs):
        """
        Execute request through pipeline.
        
        Parameters:
        - request: HTTP request object
        - kwargs: Additional configuration options
        
        Returns:
        Response object from pipeline execution
        """
    
    def __enter__(self):
        """Enter context manager."""
    
    def __exit__(self, *exc_details):
        """Exit context manager."""

HTTP Policies

Policy interfaces for processing requests and responses.

class HTTPPolicy:
    def __init__(self):
        """Initialize HTTP policy."""
        self.next = None  # Next policy in chain
    
    def send(self, request, **kwargs):
        """
        Process request and call next policy.
        
        Parameters:
        - request: Request object to process
        - kwargs: Additional configuration
        
        Returns:
        Response object
        """

class SansIOHTTPPolicy:
    """
    Sans I/O policy for request/response processing.
    Can act before and after I/O without being tied to specific HTTP implementation.
    """
    
    def on_request(self, request, **kwargs):
        """
        Process request before sending.
        
        Parameters:
        - request: Request to process
        - kwargs: Additional options
        """
    
    def on_response(self, request, response, **kwargs):
        """
        Process response after receiving.
        
        Parameters:
        - request: Original request
        - response: Received response
        - kwargs: Additional options
        """
    
    def on_exception(self, request, **kwargs) -> bool:
        """
        Handle exceptions during request processing.
        
        Parameters:
        - request: Request that caused exception
        - kwargs: Additional context
        
        Returns:
        True if exception was handled, False to re-raise
        """

HTTP Sender

Abstract base for HTTP request execution.

class HTTPSender:
    def send(self, request, **config):
        """
        Send HTTP request.
        
        Parameters:
        - request: Request object to send
        - config: Configuration overrides
        
        Returns:
        Response object
        """
    
    def build_context(self):
        """
        Build context object for pipeline.
        
        Returns:
        Context object (implementation specific)
        """
    
    def __enter__(self):
        """Enter context manager."""
    
    def __exit__(self, *exc_details):
        """Exit context manager."""

Request and Response Wrappers

Pipeline-specific request and response containers.

class Request:
    def __init__(self, http_request, context=None):
        """
        Pipeline request wrapper.
        
        Parameters:
        - http_request: Underlying HTTP request object
        - context: Pipeline context data
        """
    
    http_request: any  # Underlying HTTP request
    context: any  # Pipeline context

class Response:
    def __init__(self, request, http_response, context=None):
        """
        Pipeline response wrapper.
        
        Parameters:
        - request: Original Request object
        - http_response: Underlying HTTP response
        - context: Pipeline context
        """
    
    request: Request  # Original request
    http_response: any  # Underlying HTTP response
    context: dict  # Pipeline context dictionary

Built-in Policies

Common policies provided by msrest.

class UserAgentPolicy:
    """Policy for managing User-Agent header."""
    
    def __init__(self, user_agent=None):
        """
        Initialize User-Agent policy.
        
        Parameters:
        - user_agent: Custom user agent string
        """
    
    user_agent: str
    
    def add_user_agent(self, value: str):
        """Add value to user agent string."""

class HTTPLogger:
    """Policy for logging HTTP requests and responses."""
    
    enable_http_logger: bool = True
    
    def __init__(self, enable_http_logger=True):
        """
        Initialize HTTP logger policy.
        
        Parameters:
        - enable_http_logger: Enable/disable logging
        """

class RawDeserializer:
    """Policy for deserializing raw HTTP responses."""
    
    CONTEXT_NAME: str = "deserialized_data"
    
    @staticmethod
    def deserialize_from_text(data, content_type=None):
        """
        Deserialize text data.
        
        Parameters:
        - data: Text data to deserialize
        - content_type: Content type hint
        
        Returns:
        Deserialized data
        """
    
    @staticmethod
    def deserialize_from_http_generics(text, headers):
        """
        Deserialize from HTTP response components.
        
        Parameters:
        - text: Response text
        - headers: Response headers
        
        Returns:
        Deserialized data
        """

Async Pipeline (Python 3.5+)

Async versions of pipeline components.

class AsyncPipeline:
    """Async version of Pipeline."""
    
    def __init__(self, policies=None, sender=None):
        """Initialize async pipeline."""
    
    async def run(self, request, **kwargs):
        """Execute request through async pipeline."""

class AsyncHTTPPolicy:
    """Async HTTP policy interface."""
    
    async def send(self, request, **kwargs):
        """Process request asynchronously."""

class AsyncHTTPSender:
    """Async HTTP sender interface."""
    
    async def send(self, request, **config):
        """Send request asynchronously."""

Usage Examples

Basic Pipeline Setup

from msrest.pipeline import Pipeline, HTTPPolicy
from msrest import ServiceClient, Configuration

# Create custom policy
class LoggingPolicy(HTTPPolicy):
    def send(self, request, **kwargs):
        print(f"Sending request to: {request.http_request.url}")
        response = self.next.send(request, **kwargs)
        print(f"Received response: {response.http_response.status_code}")
        return response

# Create pipeline with policies
policies = [LoggingPolicy()]
pipeline = Pipeline(policies)

# Use with service client
config = Configuration(base_url='https://api.example.com')
config.pipeline = pipeline

client = ServiceClient(None, config)

Custom Authentication Policy

from msrest.pipeline import SansIOHTTPPolicy

class CustomAuthPolicy(SansIOHTTPPolicy):
    """Custom authentication policy."""
    
    def __init__(self, api_key):
        self.api_key = api_key
    
    def on_request(self, request, **kwargs):
        """Add authentication header to request."""
        request.http_request.headers['Authorization'] = f'Bearer {self.api_key}'
    
    def on_response(self, request, response, **kwargs):
        """Handle authentication errors."""
        if response.http_response.status_code == 401:
            print("Authentication failed - token may be expired")

# Use custom auth policy
auth_policy = CustomAuthPolicy('your-api-key')
policies = [auth_policy]
pipeline = Pipeline(policies)

Retry Policy

import time
import random
from msrest.pipeline import HTTPPolicy
from msrest.exceptions import HttpOperationError

class RetryPolicy(HTTPPolicy):
    """Simple retry policy with exponential backoff."""
    
    def __init__(self, max_retries=3, base_delay=1):
        super(RetryPolicy, self).__init__()
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    def send(self, request, **kwargs):
        """Send request with retry logic."""
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                response = self.next.send(request, **kwargs)
                
                # Check if we should retry based on status code
                if response.http_response.status_code >= 500:
                    if attempt < self.max_retries:
                        delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
                        print(f"Server error, retrying in {delay:.1f}s (attempt {attempt + 1})")
                        time.sleep(delay)
                        continue
                
                return response
                
            except Exception as e:
                last_exception = e
                if attempt < self.max_retries:
                    delay = self.base_delay * (2 ** attempt)
                    print(f"Request failed, retrying in {delay}s (attempt {attempt + 1})")
                    time.sleep(delay)
                    continue
                
        # All retries exhausted
        if last_exception:
            raise last_exception
        else:
            return response

# Use retry policy
retry_policy = RetryPolicy(max_retries=3, base_delay=2)
policies = [retry_policy]
pipeline = Pipeline(policies)

Request/Response Transformation

from msrest.pipeline import SansIOHTTPPolicy
import json

class RequestTransformPolicy(SansIOHTTPPolicy):
    """Transform requests and responses."""
    
    def on_request(self, request, **kwargs):
        """Transform outgoing requests."""
        # Add timestamp to all requests
        if hasattr(request.http_request, 'data') and request.http_request.data:
            try:
                data = json.loads(request.http_request.data)
                data['timestamp'] = time.time()
                request.http_request.data = json.dumps(data)
            except (json.JSONDecodeError, TypeError):
                pass  # Skip transformation for non-JSON data
        
        # Add correlation ID
        import uuid
        correlation_id = str(uuid.uuid4())
        request.http_request.headers['X-Correlation-ID'] = correlation_id
        
        # Store in context for response processing
        if not request.context:
            request.context = {}
        request.context['correlation_id'] = correlation_id
    
    def on_response(self, request, response, **kwargs):
        """Transform incoming responses."""
        # Log correlation
        correlation_id = request.context.get('correlation_id')
        if correlation_id:
            print(f"Response for correlation ID {correlation_id}")
        
        # Add custom header to context
        if hasattr(response.http_response, 'headers'):
            response.context['server_time'] = response.http_response.headers.get('Date')

# Use transformation policy
transform_policy = RequestTransformPolicy()
policies = [transform_policy]
pipeline = Pipeline(policies)

Pipeline with Multiple Policies

from msrest.pipeline import Pipeline
from msrest.pipeline.universal import UserAgentPolicy, HTTPLogger

# Create multiple policies
user_agent_policy = UserAgentPolicy()
user_agent_policy.add_user_agent('MyApp/1.0')

http_logger = HTTPLogger(enable_http_logger=True)

class MetricsPolicy(SansIOHTTPPolicy):
    """Collect request metrics."""
    
    def __init__(self):
        self.request_count = 0
        self.response_times = []
    
    def on_request(self, request, **kwargs):
        self.request_count += 1
        request.context['start_time'] = time.time()
    
    def on_response(self, request, response, **kwargs):
        if 'start_time' in request.context:
            duration = time.time() - request.context['start_time']
            self.response_times.append(duration)
            print(f"Request took {duration:.3f}s")

metrics_policy = MetricsPolicy()

# Combine policies (order matters)
policies = [
    user_agent_policy,    # Set user agent first
    metrics_policy,       # Collect metrics
    retry_policy,         # Retry on failures
    http_logger          # Log requests (usually last)
]

pipeline = Pipeline(policies)

# Use pipeline
config = Configuration(base_url='https://api.example.com')
config.pipeline = pipeline

with ServiceClient(None, config) as client:
    # Make multiple requests
    for i in range(5):
        request = client.get(f'/data/{i}')
        response = client.send(request)
    
    # Check metrics
    print(f"Total requests: {metrics_policy.request_count}")
    print(f"Average response time: {sum(metrics_policy.response_times) / len(metrics_policy.response_times):.3f}s")

Async Pipeline Usage

import asyncio
from msrest.pipeline import AsyncPipeline, AsyncHTTPPolicy

class AsyncLoggingPolicy(AsyncHTTPPolicy):
    """Async logging policy."""
    
    async def send(self, request, **kwargs):
        print(f"[ASYNC] Sending request to: {request.http_request.url}")
        response = await self.next.send(request, **kwargs)
        print(f"[ASYNC] Received response: {response.http_response.status_code}")
        return response

# Create async pipeline
async_policies = [AsyncLoggingPolicy()]
async_pipeline = AsyncPipeline(async_policies)

# Use with async client (pseudo-code)
async def async_example():
    async_client = await create_async_client()
    async_client.config.pipeline = async_pipeline
    
    request = async_client.get('/async-data')
    response = await async_client.send(request)
    
    return response

# Run async pipeline
result = asyncio.run(async_example())

Pipeline Context Usage

from msrest.pipeline import SansIOHTTPPolicy

class ContextPolicy(SansIOHTTPPolicy):
    """Policy demonstrating context usage."""
    
    def on_request(self, request, **kwargs):
        """Add data to request context."""
        if not request.context:
            request.context = {}
        
        # Add request metadata
        request.context.update({
            'request_id': str(uuid.uuid4()),
            'start_time': time.time(),
            'user_data': kwargs.get('user_data', {})
        })
    
    def on_response(self, request, response, **kwargs):
        """Process context data in response."""
        # Calculate request duration
        if 'start_time' in request.context:
            duration = time.time() - request.context['start_time']
            response.context['request_duration'] = duration
        
        # Copy request ID to response
        if 'request_id' in request.context:
            response.context['request_id'] = request.context['request_id']

# Use context policy
context_policy = ContextPolicy()
pipeline = Pipeline([context_policy])

# Send request with context data
with ServiceClient(None, config) as client:
    request = client.get('/data')
    response = client.send(request, user_data={'session_id': '12345'})
    
    # Access response context
    print(f"Request duration: {response.context.get('request_duration', 'unknown')}")
    print(f"Request ID: {response.context.get('request_id', 'unknown')}")

Error Handling in Policies

from msrest.pipeline import HTTPPolicy
from msrest.exceptions import ClientException

class ErrorHandlingPolicy(HTTPPolicy):
    """Policy with comprehensive error handling."""
    
    def send(self, request, **kwargs):
        try:
            response = self.next.send(request, **kwargs)
            
            # Check for client errors
            if 400 <= response.http_response.status_code < 500:
                # Handle client errors
                self._handle_client_error(request, response)
            
            # Check for server errors  
            elif response.http_response.status_code >= 500:
                # Handle server errors
                self._handle_server_error(request, response)
            
            return response
            
        except Exception as e:
            # Handle network/connection errors
            self._handle_network_error(request, e)
            raise
    
    def _handle_client_error(self, request, response):
        """Handle 4xx client errors."""
        status = response.http_response.status_code
        if status == 401:
            print("Authentication required")
        elif status == 403:
            print("Access forbidden")
        elif status == 404:
            print("Resource not found")
        elif status == 429:
            print("Rate limit exceeded")
    
    def _handle_server_error(self, request, response):
        """Handle 5xx server errors."""
        print(f"Server error: {response.http_response.status_code}")
    
    def _handle_network_error(self, request, exception):
        """Handle network/connection errors."""
        print(f"Network error: {type(exception).__name__}: {exception}")

# Use error handling policy
error_policy = ErrorHandlingPolicy()
pipeline = Pipeline([error_policy])

Types

class ClientRawResponse:
    """
    Wrapper for response with additional data.
    
    Attributes:
    - output: Deserialized response object
    - response: Raw HTTP response
    - headers: Dict of deserialized headers
    """
    
    def __init__(self, output, response):
        """Initialize raw response wrapper."""
    
    def add_headers(self, header_dict: dict):
        """
        Deserialize specific headers.
        
        Parameters:
        - header_dict: Dict mapping header names to types
        """

Install with Tessl CLI

npx tessl i tessl/pypi-msrest

docs

authentication.md

configuration.md

exceptions.md

index.md

paging.md

pipeline.md

polling.md

serialization.md

service-client.md

tile.json