Microsoft Azure Core Library providing foundational infrastructure for Azure SDK Python clients
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Core's HTTP pipeline provides a modular system for processing HTTP requests and responses through a chain of configurable policies. This architecture enables consistent behavior across all Azure SDK clients while allowing customization for specific service requirements.
The pipeline processes requests through a series of policies before sending to the transport layer, then processes responses through the same policies in reverse order.
from azure.core.pipeline import Pipeline, AsyncPipeline, PipelineRequest, PipelineResponse, PipelineContext
from typing import List
class Pipeline:
"""Synchronous HTTP pipeline."""
def __init__(self, transport: HttpTransport, policies: Optional[List[HTTPPolicy]] = None): ...
def run(self, request: PipelineRequest) -> PipelineResponse:
"""Execute the pipeline with the given request."""
...
class AsyncPipeline:
"""Asynchronous HTTP pipeline."""
def __init__(self, transport: AsyncHttpTransport, policies: Optional[List[AsyncHTTPPolicy]] = None): ...
async def run(self, request: PipelineRequest) -> PipelineResponse:
"""Execute the pipeline with the given request."""
...class PipelineRequest:
"""Container for HTTP request and context."""
def __init__(self, http_request: HttpRequest, context: PipelineContext): ...
@property
def http_request(self) -> HttpRequest: ...
@property
def context(self) -> PipelineContext: ...
class PipelineResponse:
"""Container for HTTP response and context."""
def __init__(self, http_request: HttpRequest, http_response: HttpResponse, context: PipelineContext): ...
@property
def http_request(self) -> HttpRequest: ...
@property
def http_response(self) -> HttpResponse: ...
@property
def context(self) -> PipelineContext: ...
class PipelineContext(dict):
"""Execution context for pipeline processing."""
def __init__(self, transport: HttpTransport, **kwargs): ...All pipeline policies inherit from base policy classes that define the interface for request/response processing.
from azure.core.pipeline.policies import HTTPPolicy, SansIOHTTPPolicy, AsyncHTTPPolicy
from abc import ABC, abstractmethod
class HTTPPolicy(ABC):
"""Base class for synchronous HTTP policies."""
@abstractmethod
def send(self, request: PipelineRequest) -> PipelineResponse: ...
class SansIOHTTPPolicy(ABC):
"""Base class for Sans-IO HTTP policies."""
def on_request(self, request: PipelineRequest) -> None: ...
def on_response(self, request: PipelineRequest, response: PipelineResponse) -> None: ...
def on_exception(self, request: PipelineRequest) -> bool: ...
class AsyncHTTPPolicy(ABC):
"""Base class for asynchronous HTTP policies."""
@abstractmethod
async def send(self, request: PipelineRequest) -> PipelineResponse: ...Automatic retry logic with configurable strategies and backoff algorithms.
from azure.core.pipeline.policies import RetryPolicy, AsyncRetryPolicy, RetryMode
from enum import Enum
class RetryMode(Enum):
"""Retry strategy modes."""
Exponential = "exponential"
Fixed = "fixed"
class RetryPolicy(HTTPPolicy):
"""Retry policy with exponential backoff."""
def __init__(
self,
retry_total: int = 10,
retry_connect: int = 3,
retry_read: int = 3,
retry_status: int = 3,
retry_backoff_factor: float = 0.8,
retry_backoff_max: int = 120,
retry_mode: RetryMode = RetryMode.Exponential,
**kwargs
): ...
class AsyncRetryPolicy(AsyncHTTPPolicy):
"""Async retry policy with exponential backoff."""
def __init__(self, **kwargs): ... # Same parameters as RetryPolicy
class RequestHistory:
"""Tracks retry attempt history."""
def __init__(self, http_request: HttpRequest, **kwargs): ...
@property
def method(self) -> str: ...
@property
def url(self) -> str: ...Policies for handling various authentication mechanisms.
from azure.core.pipeline.policies import BearerTokenCredentialPolicy, AsyncBearerTokenCredentialPolicy
from azure.core.pipeline.policies import AzureKeyCredentialPolicy, AzureSasCredentialPolicy
class BearerTokenCredentialPolicy(SansIOHTTPPolicy):
"""Bearer token authentication policy."""
def __init__(self, credential: TokenCredential, *scopes: str, **kwargs): ...
class AsyncBearerTokenCredentialPolicy(AsyncHTTPPolicy):
"""Async bearer token authentication policy."""
def __init__(self, credential: AsyncTokenCredential, *scopes: str, **kwargs): ...
class AzureKeyCredentialPolicy(SansIOHTTPPolicy):
"""API key authentication policy."""
def __init__(self, credential: AzureKeyCredential, name: str, prefix: str = "", **kwargs): ...
class AzureSasCredentialPolicy(SansIOHTTPPolicy):
"""SAS authentication policy."""
def __init__(self, credential: AzureSasCredential, **kwargs): ...Policies for handling HTTP redirects and response processing.
from azure.core.pipeline.policies import RedirectPolicy, AsyncRedirectPolicy, ContentDecodePolicy
class RedirectPolicy(HTTPPolicy):
"""HTTP redirect handling policy."""
def __init__(self, permit_redirects: bool = True, redirect_max: int = 30, **kwargs): ...
class AsyncRedirectPolicy(AsyncHTTPPolicy):
"""Async HTTP redirect handling policy."""
def __init__(self, **kwargs): ... # Same parameters as RedirectPolicy
class ContentDecodePolicy(SansIOHTTPPolicy):
"""Automatic response content decoding policy."""
def __init__(self, **kwargs): ...Policies for managing HTTP headers and user agent strings.
from azure.core.pipeline.policies import HeadersPolicy, UserAgentPolicy
class HeadersPolicy(SansIOHTTPPolicy):
"""Custom headers policy."""
def __init__(self, base_headers: Optional[Dict[str, str]] = None, **kwargs): ...
class UserAgentPolicy(SansIOHTTPPolicy):
"""User agent header policy."""
def __init__(self, user_agent: Optional[str] = None, **kwargs): ...
def add_user_agent(self, value: str) -> None:
"""Add a custom user agent value."""
...Policies for request/response logging and network tracing.
from azure.core.pipeline.policies import NetworkTraceLoggingPolicy, HttpLoggingPolicy, RequestIdPolicy
class NetworkTraceLoggingPolicy(SansIOHTTPPolicy):
"""Network request/response tracing policy."""
def __init__(self, logging_enable: bool = False, **kwargs): ...
class HttpLoggingPolicy(SansIOHTTPPolicy):
"""HTTP request/response logging policy."""
def __init__(self, logger: Optional[logging.Logger] = None, **kwargs): ...
class RequestIdPolicy(SansIOHTTPPolicy):
"""Request ID generation policy."""
def __init__(self, **kwargs): ...Specialized policies for distributed tracing, proxy support, and custom hooks.
from azure.core.pipeline.policies import DistributedTracingPolicy, ProxyPolicy, CustomHookPolicy
from azure.core.pipeline.policies import SensitiveHeaderCleanupPolicy
class DistributedTracingPolicy(SansIOHTTPPolicy):
"""Distributed tracing policy with OpenTelemetry support."""
def __init__(self, **kwargs): ...
class ProxyPolicy(SansIOHTTPPolicy):
"""HTTP proxy configuration policy."""
def __init__(self, proxies: Optional[Dict[str, str]] = None, **kwargs): ...
class CustomHookPolicy(SansIOHTTPPolicy):
"""Custom request/response hook policy."""
def __init__(self, **kwargs): ...
class SensitiveHeaderCleanupPolicy(SansIOHTTPPolicy):
"""Removes sensitive headers from logs and traces."""
def __init__(self, **kwargs): ...from azure.core.pipeline import Pipeline
from azure.core.pipeline.policies import RetryPolicy, UserAgentPolicy, HeadersPolicy
from azure.core.pipeline.transport import RequestsTransport
# Create transport
transport = RequestsTransport()
# Configure policies
policies = [
UserAgentPolicy(user_agent="MyApp/1.0"),
HeadersPolicy(base_headers={"Accept": "application/json"}),
RetryPolicy(retry_total=5),
]
# Create pipeline
pipeline = Pipeline(transport=transport, policies=policies)
# Create and send request
from azure.core.pipeline.transport import HttpRequest
from azure.core.pipeline import PipelineRequest, PipelineContext
request = HttpRequest("GET", "https://api.example.com/data")
pipeline_request = PipelineRequest(request, PipelineContext(transport))
response = pipeline.run(pipeline_request)
print(f"Status: {response.http_response.status_code}")from azure.core.pipeline.policies import SansIOHTTPPolicy
class CustomHeaderPolicy(SansIOHTTPPolicy):
"""Adds custom headers to all requests."""
def __init__(self, custom_header_value, **kwargs):
super().__init__(**kwargs)
self.custom_value = custom_header_value
def on_request(self, request):
request.http_request.headers["X-Custom-Header"] = self.custom_value
def on_response(self, request, response):
# Process response if needed
pass
# Use custom policy
custom_policy = CustomHeaderPolicy("my-value")
policies = [custom_policy, RetryPolicy()]
pipeline = Pipeline(transport=transport, policies=policies)import asyncio
from azure.core.pipeline import AsyncPipeline
from azure.core.pipeline.policies import AsyncRetryPolicy
from azure.core.pipeline.transport import AioHttpTransport
async def async_pipeline_example():
# Create async transport and policies
transport = AioHttpTransport()
policies = [AsyncRetryPolicy(retry_total=3)]
# Create async pipeline
pipeline = AsyncPipeline(transport=transport, policies=policies)
# Create and send request
request = HttpRequest("GET", "https://api.example.com/data")
pipeline_request = PipelineRequest(request, PipelineContext(transport))
response = await pipeline.run(pipeline_request)
print(f"Status: {response.http_response.status_code}")
# asyncio.run(async_pipeline_example())from azure.core.pipeline.policies import (
RetryPolicy, RetryMode, BearerTokenCredentialPolicy,
DistributedTracingPolicy, HttpLoggingPolicy
)
# Configure advanced retry policy
retry_policy = RetryPolicy(
retry_total=10,
retry_backoff_factor=1.0,
retry_backoff_max=60,
retry_mode=RetryMode.Exponential,
retry_on_status_codes=[429, 500, 502, 503, 504]
)
# Configure authentication
# auth_policy = BearerTokenCredentialPolicy(credential, "https://api.example.com/.default")
# Configure tracing and logging
tracing_policy = DistributedTracingPolicy()
logging_policy = HttpLoggingPolicy()
# Combine all policies
policies = [
# auth_policy,
retry_policy,
tracing_policy,
logging_policy,
]The order of policies in the pipeline is crucial for proper request/response processing:
SansIOHTTPPolicy for stateless policies that only modify headers/metadataHTTPPolicy or AsyncHTTPPolicy for policies that need to control request flowon_exception method in Sans-IO policies for cleanupInstall with Tessl CLI
npx tessl i tessl/pypi-azure-coredocs