Microsoft Azure Core Library providing foundational infrastructure for Azure SDK Python clients
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Core provides HTTP transport abstractions that support multiple async frameworks and networking libraries. The transport layer handles the actual HTTP communication while providing a consistent interface for all Azure SDK clients.
Transport classes define the interface for sending HTTP requests and receiving responses.
from azure.core.pipeline.transport import HttpTransport, AsyncHttpTransport, HttpRequest, HttpResponse, AsyncHttpResponse
from abc import ABC, abstractmethod
class HttpTransport(ABC):
"""Base synchronous HTTP transport interface."""
@abstractmethod
def send(self, request: HttpRequest, **kwargs) -> HttpResponse:
"""Send HTTP request and return response."""
...
@abstractmethod
def open(self) -> None:
"""Open transport connection."""
...
@abstractmethod
def close(self) -> None:
"""Close transport connection."""
...
def __enter__(self):
self.open()
return self
def __exit__(self, *args):
self.close()
class AsyncHttpTransport(ABC):
"""Base asynchronous HTTP transport interface."""
@abstractmethod
async def send(self, request: HttpRequest, **kwargs) -> AsyncHttpResponse:
"""Send HTTP request and return response."""
...
@abstractmethod
async def open(self) -> None:
"""Open transport connection."""
...
@abstractmethod
async def close(self) -> None:
"""Close transport connection."""
...
async def __aenter__(self):
await self.open()
return self
async def __aexit__(self, *args):
await self.close()Core classes for representing HTTP requests and responses.
class HttpRequest:
"""HTTP request representation."""
def __init__(
self,
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
files: Optional[Dict] = None,
data: Optional[Union[bytes, str, Dict]] = None,
**kwargs
): ...
@property
def method(self) -> str: ...
@property
def url(self) -> str: ...
@property
def headers(self) -> Dict[str, str]: ...
@property
def body(self) -> Optional[bytes]: ...
def set_json_body(self, data: Any) -> None:
"""Set request body as JSON."""
...
def set_multipart_mixed(self, *requests: "HttpRequest") -> None:
"""Set multipart/mixed body."""
...
class HttpResponse:
"""HTTP response representation."""
def __init__(self, request: HttpRequest, internal_response, **kwargs): ...
@property
def request(self) -> HttpRequest: ...
@property
def status_code(self) -> int: ...
@property
def headers(self) -> Dict[str, str]: ...
@property
def reason(self) -> str: ...
@property
def content_type(self) -> Optional[str]: ...
@property
def text(self) -> str: ...
@property
def content(self) -> bytes: ...
def json(self) -> Any:
"""Parse response content as JSON."""
...
def iter_bytes(self, chunk_size: int = 1024) -> Iterator[bytes]:
"""Iterate response content in chunks."""
...
def iter_raw(self, chunk_size: int = 1024) -> Iterator[bytes]:
"""Iterate raw response content."""
...
class AsyncHttpResponse:
"""Asynchronous HTTP response representation."""
# Same properties as HttpResponse
async def json(self) -> Any:
"""Parse response content as JSON asynchronously."""
...
async def text(self) -> str:
"""Get response content as text asynchronously."""
...
async def read(self) -> bytes:
"""Read response content asynchronously."""
...
def iter_bytes(self, chunk_size: int = 1024) -> AsyncIterator[bytes]:
"""Async iterate response content in chunks."""
...HTTP transport implementation using the popular requests library.
from azure.core.pipeline.transport import RequestsTransport, RequestsTransportResponse
class RequestsTransport(HttpTransport):
"""HTTP transport using the requests library."""
def __init__(
self,
session: Optional[requests.Session] = None,
session_owner: bool = True,
connection_timeout: float = 300,
read_timeout: float = 300,
**kwargs
): ...
def send(self, request: HttpRequest, **kwargs) -> RequestsTransportResponse: ...
class RequestsTransportResponse(HttpResponse):
"""Response implementation for requests transport."""
passAsync HTTP transport using asyncio with requests-like interface.
from azure.core.pipeline.transport import AsyncioRequestsTransport, AsyncioRequestsTransportResponse
class AsyncioRequestsTransport(AsyncHttpTransport):
"""Async HTTP transport using asyncio."""
def __init__(
self,
session: Optional[aiohttp.ClientSession] = None,
session_owner: bool = True,
**kwargs
): ...
async def send(self, request: HttpRequest, **kwargs) -> AsyncioRequestsTransportResponse: ...
class AsyncioRequestsTransportResponse(AsyncHttpResponse):
"""Async response implementation for asyncio transport."""
passHTTP transport implementation using the aiohttp library for async operations.
from azure.core.pipeline.transport import AioHttpTransport, AioHttpTransportResponse
class AioHttpTransport(AsyncHttpTransport):
"""HTTP transport using aiohttp library."""
def __init__(
self,
session: Optional[aiohttp.ClientSession] = None,
session_owner: bool = True,
**kwargs
): ...
async def send(self, request: HttpRequest, **kwargs) -> AioHttpTransportResponse: ...
class AioHttpTransportResponse(AsyncHttpResponse):
"""Response implementation for aiohttp transport."""
passHTTP transport for the Trio async framework.
from azure.core.pipeline.transport import TrioRequestsTransport, TrioRequestsTransportResponse
class TrioRequestsTransport(AsyncHttpTransport):
"""HTTP transport for Trio async framework."""
def __init__(self, **kwargs): ...
async def send(self, request: HttpRequest, **kwargs) -> TrioRequestsTransportResponse: ...
class TrioRequestsTransportResponse(AsyncHttpResponse):
"""Response implementation for Trio transport."""
passfrom azure.core.pipeline.transport import RequestsTransport, HttpRequest
# Create transport
transport = RequestsTransport(
connection_timeout=30,
read_timeout=60
)
# Create request
request = HttpRequest("GET", "https://api.example.com/data")
request.headers["Accept"] = "application/json"
# Send request
with transport:
response = transport.send(request)
print(f"Status: {response.status_code}")
print(f"Content: {response.text}")import asyncio
from azure.core.pipeline.transport import AioHttpTransport, HttpRequest
async def async_transport_example():
# Create async transport
transport = AioHttpTransport()
# Create request
request = HttpRequest("GET", "https://api.example.com/data")
# Send request
async with transport:
response = await transport.send(request)
content = await response.text()
print(f"Status: {response.status_code}")
print(f"Content: {content}")
# asyncio.run(async_transport_example())import requests
from azure.core.pipeline.transport import RequestsTransport
# Create custom session with specific configuration
session = requests.Session()
session.verify = "/path/to/ca-bundle.pem" # Custom CA bundle
session.cert = ("/path/to/client.cert", "/path/to/client.key") # Client cert
session.proxies = {
"http": "http://proxy.example.com:8080",
"https": "https://proxy.example.com:8080"
}
# Create transport with custom session
transport = RequestsTransport(
session=session,
session_owner=False, # Don't close session automatically
connection_timeout=60,
read_timeout=120
)from azure.core.pipeline.transport import HttpRequest
def download_large_file(transport, url, filename):
request = HttpRequest("GET", url)
with transport:
response = transport.send(request, stream=True)
if response.status_code == 200:
with open(filename, "wb") as f:
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
else:
print(f"Download failed: {response.status_code}")import json
from azure.core.pipeline.transport import HttpRequest
def send_json_request(transport, url, data):
request = HttpRequest("POST", url)
request.headers["Content-Type"] = "application/json"
request.set_json_body(data)
with transport:
response = transport.send(request)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Request failed: {response.status_code}")
# Example usage
data = {"name": "example", "value": 42}
result = send_json_request(transport, "https://api.example.com/items", data)from azure.core.pipeline.transport import HttpRequest
def upload_file_multipart(transport, url, file_path):
with open(file_path, "rb") as f:
files = {"file": f}
request = HttpRequest("POST", url, files=files)
with transport:
response = transport.send(request)
return response.status_code == 200import asyncio
from azure.core.pipeline.transport import AioHttpTransport, HttpRequest
async def async_stream_download(url, filename):
transport = AioHttpTransport()
request = HttpRequest("GET", url)
async with transport:
response = await transport.send(request)
if response.status_code == 200:
with open(filename, "wb") as f:
async for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
else:
print(f"Download failed: {response.status_code}")
# asyncio.run(async_stream_download("https://example.com/file.zip", "file.zip"))Azure Core automatically selects appropriate transport based on available libraries:
from azure.core.pipeline.transport import RequestsTransport
from azure.core import PipelineClient
# Azure Core will automatically choose the best available transport
client = PipelineClient(base_url="https://api.example.com")
# Or explicitly specify transport
transport = RequestsTransport()
client = PipelineClient(base_url="https://api.example.com", transport=transport)import requests
from azure.core.pipeline.transport import RequestsTransport
# Configure connection pooling
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=20, # Number of connection pools
pool_maxsize=20, # Maximum connections per pool
max_retries=3
)
session.mount("http://", adapter)
session.mount("https://", adapter)
transport = RequestsTransport(session=session, session_owner=False)import ssl
import aiohttp
from azure.core.pipeline.transport import AioHttpTransport
# Create SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Configure aiohttp with custom SSL
connector = aiohttp.TCPConnector(ssl=ssl_context)
session = aiohttp.ClientSession(connector=connector)
transport = AioHttpTransport(session=session, session_owner=False)Configuration classes and methods for managing connection lifecycle, pooling, and advanced settings.
from azure.core.configuration import ConnectionConfiguration
from azure.core.pipeline.transport._bigger_block_size_http_adapters import BiggerBlockSizeHTTPAdapter
from typing import Union, Optional, Any
class ConnectionConfiguration:
"""HTTP transport connection configuration settings."""
def __init__(
self,
*,
connection_timeout: float = 300,
read_timeout: float = 300,
connection_verify: Union[bool, str] = True,
connection_cert: Optional[str] = None,
connection_data_block_size: int = 4096,
**kwargs: Any
) -> None: ...
class BiggerBlockSizeHTTPAdapter:
"""Custom HTTP adapter that optimizes block size for better performance."""
def get_connection(self, url, proxies=None): ...import requests
from requests.adapters import HTTPAdapter
from azure.core.pipeline.transport import RequestsTransport, AioHttpTransport
import aiohttp
import ssl
# Advanced connection pooling configuration
session = requests.Session()
adapter = HTTPAdapter(
pool_connections=20, # Number of connection pools
pool_maxsize=20, # Max connections per pool
max_retries=3
)
session.mount("http://", adapter)
session.mount("https://", adapter)
transport = RequestsTransport(session=session, session_owner=False)
# Advanced SSL configuration for aiohttp
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations("/path/to/ca-bundle.pem")
ssl_context.load_cert_chain("/path/to/client.pem", "/path/to/client.key")
connector = aiohttp.TCPConnector(
ssl=ssl_context,
limit=100, # Total connection pool size
limit_per_host=20, # Per-host connection limit
keepalive_timeout=30 # Keep-alive timeout
)
session = aiohttp.ClientSession(connector=connector)
transport = AioHttpTransport(session=session, session_owner=False)
# Session ownership patterns
# Transport owns session (default) - will close session automatically
transport_owned = RequestsTransport(session_owner=True)
# External session management - must close session manually
external_session = requests.Session()
transport_external = RequestsTransport(session=external_session, session_owner=False)Extended timeout handling with per-request overrides.
from azure.core.pipeline.transport import RequestsTransport, HttpRequest
transport = RequestsTransport()
request = HttpRequest("GET", "https://api.example.com/data")
# Per-request timeout override
response = transport.send(
request,
connection_timeout=30.0, # Connection establishment timeout
read_timeout=60.0 # Read timeout
)
# Environment settings control
transport_with_env = RequestsTransport(use_env_settings=True) # Default
transport_no_env = RequestsTransport(use_env_settings=False) # Disable env proxyAdvanced response handling methods for different transport implementations.
from azure.core.pipeline.transport import AsyncHttpResponse, HttpResponse
from typing import Iterator, AsyncIterator
class AsyncHttpResponse:
async def load_body(self) -> None:
"""Load response body into memory for sync access."""
def parts(self) -> AsyncIterator["AsyncHttpResponse"]:
"""Parse multipart/mixed responses asynchronously."""
class HttpResponse:
def stream_download(self, pipeline, **kwargs) -> Iterator[bytes]:
"""Stream download with error handling and decompression control."""
def __getstate__(self):
"""Pickle support - loads body and handles unpicklable objects."""from azure.core.pipeline.transport import AioHttpTransport, HttpRequest
async def handle_multipart_response():
transport = AioHttpTransport()
request = HttpRequest("GET", "https://api.example.com/multipart")
async with transport:
response = await transport.send(request)
# Handle multipart response
async for part in response.parts():
content = await part.read()
print(f"Part content: {len(content)} bytes")
# Stream download with progress tracking
def download_with_progress(transport, url, file_path):
request = HttpRequest("GET", url)
with transport:
response = transport.send(request)
with open(file_path, "wb") as f:
total_bytes = 0
for chunk in response.stream_download(pipeline=None):
f.write(chunk)
total_bytes += len(chunk)
print(f"Downloaded: {total_bytes} bytes")with statements) for proper resource cleanupInstall with Tessl CLI
npx tessl i tessl/pypi-azure-coredocs