CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-core

Microsoft Azure Core Library providing foundational infrastructure for Azure SDK Python clients

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

transport-and-networking.mddocs/

Transport and Networking

Azure Core provides HTTP transport abstractions that support multiple async frameworks and networking libraries. The transport layer handles the actual HTTP communication while providing a consistent interface for all Azure SDK clients.

Base Transport Interfaces

Transport classes define the interface for sending HTTP requests and receiving responses.

from azure.core.pipeline.transport import HttpTransport, AsyncHttpTransport, HttpRequest, HttpResponse, AsyncHttpResponse
from abc import ABC, abstractmethod

class HttpTransport(ABC):
    """Base synchronous HTTP transport interface."""
    @abstractmethod
    def send(self, request: HttpRequest, **kwargs) -> HttpResponse:
        """Send HTTP request and return response."""
        ...
    
    @abstractmethod
    def open(self) -> None:
        """Open transport connection."""
        ...
    
    @abstractmethod
    def close(self) -> None:
        """Close transport connection."""
        ...
    
    def __enter__(self):
        self.open()
        return self
    
    def __exit__(self, *args):
        self.close()

class AsyncHttpTransport(ABC):
    """Base asynchronous HTTP transport interface."""
    @abstractmethod
    async def send(self, request: HttpRequest, **kwargs) -> AsyncHttpResponse:
        """Send HTTP request and return response."""
        ...
    
    @abstractmethod
    async def open(self) -> None:
        """Open transport connection."""
        ...
    
    @abstractmethod
    async def close(self) -> None:
        """Close transport connection."""
        ...
    
    async def __aenter__(self):
        await self.open()
        return self
    
    async def __aexit__(self, *args):
        await self.close()

HTTP Request and Response

Core classes for representing HTTP requests and responses.

class HttpRequest:
    """HTTP request representation."""
    def __init__(
        self,
        method: str,
        url: str,
        headers: Optional[Dict[str, str]] = None,
        files: Optional[Dict] = None,
        data: Optional[Union[bytes, str, Dict]] = None,
        **kwargs
    ): ...
    
    @property
    def method(self) -> str: ...
    
    @property
    def url(self) -> str: ...
    
    @property
    def headers(self) -> Dict[str, str]: ...
    
    @property
    def body(self) -> Optional[bytes]: ...
    
    def set_json_body(self, data: Any) -> None:
        """Set request body as JSON."""
        ...
    
    def set_multipart_mixed(self, *requests: "HttpRequest") -> None:
        """Set multipart/mixed body."""
        ...

class HttpResponse:
    """HTTP response representation."""
    def __init__(self, request: HttpRequest, internal_response, **kwargs): ...
    
    @property
    def request(self) -> HttpRequest: ...
    
    @property
    def status_code(self) -> int: ...
    
    @property
    def headers(self) -> Dict[str, str]: ...
    
    @property
    def reason(self) -> str: ...
    
    @property
    def content_type(self) -> Optional[str]: ...
    
    @property
    def text(self) -> str: ...
    
    @property
    def content(self) -> bytes: ...
    
    def json(self) -> Any:
        """Parse response content as JSON."""
        ...
    
    def iter_bytes(self, chunk_size: int = 1024) -> Iterator[bytes]:
        """Iterate response content in chunks."""
        ...
    
    def iter_raw(self, chunk_size: int = 1024) -> Iterator[bytes]:
        """Iterate raw response content."""
        ...

class AsyncHttpResponse:
    """Asynchronous HTTP response representation."""
    # Same properties as HttpResponse
    
    async def json(self) -> Any:
        """Parse response content as JSON asynchronously."""
        ...
    
    async def text(self) -> str:
        """Get response content as text asynchronously."""
        ...
    
    async def read(self) -> bytes:
        """Read response content asynchronously."""
        ...
    
    def iter_bytes(self, chunk_size: int = 1024) -> AsyncIterator[bytes]:
        """Async iterate response content in chunks."""
        ...

Capabilities

Requests Transport

HTTP transport implementation using the popular requests library.

from azure.core.pipeline.transport import RequestsTransport, RequestsTransportResponse

class RequestsTransport(HttpTransport):
    """HTTP transport using the requests library."""
    def __init__(
        self,
        session: Optional[requests.Session] = None,
        session_owner: bool = True,
        connection_timeout: float = 300,
        read_timeout: float = 300,
        **kwargs
    ): ...
    
    def send(self, request: HttpRequest, **kwargs) -> RequestsTransportResponse: ...

class RequestsTransportResponse(HttpResponse):
    """Response implementation for requests transport."""
    pass

Asyncio Requests Transport

Async HTTP transport using asyncio with requests-like interface.

from azure.core.pipeline.transport import AsyncioRequestsTransport, AsyncioRequestsTransportResponse

class AsyncioRequestsTransport(AsyncHttpTransport):
    """Async HTTP transport using asyncio."""
    def __init__(
        self,
        session: Optional[aiohttp.ClientSession] = None,
        session_owner: bool = True,
        **kwargs
    ): ...
    
    async def send(self, request: HttpRequest, **kwargs) -> AsyncioRequestsTransportResponse: ...

class AsyncioRequestsTransportResponse(AsyncHttpResponse):
    """Async response implementation for asyncio transport."""
    pass

AioHTTP Transport

HTTP transport implementation using the aiohttp library for async operations.

from azure.core.pipeline.transport import AioHttpTransport, AioHttpTransportResponse

class AioHttpTransport(AsyncHttpTransport):
    """HTTP transport using aiohttp library."""
    def __init__(
        self,
        session: Optional[aiohttp.ClientSession] = None, 
        session_owner: bool = True,
        **kwargs
    ): ...
    
    async def send(self, request: HttpRequest, **kwargs) -> AioHttpTransportResponse: ...

class AioHttpTransportResponse(AsyncHttpResponse):
    """Response implementation for aiohttp transport."""
    pass

Trio Transport

HTTP transport for the Trio async framework.

from azure.core.pipeline.transport import TrioRequestsTransport, TrioRequestsTransportResponse

class TrioRequestsTransport(AsyncHttpTransport):
    """HTTP transport for Trio async framework."""
    def __init__(self, **kwargs): ...
    
    async def send(self, request: HttpRequest, **kwargs) -> TrioRequestsTransportResponse: ...

class TrioRequestsTransportResponse(AsyncHttpResponse):
    """Response implementation for Trio transport."""
    pass

Usage Examples

Basic Transport Usage

from azure.core.pipeline.transport import RequestsTransport, HttpRequest

# Create transport
transport = RequestsTransport(
    connection_timeout=30,
    read_timeout=60
)

# Create request
request = HttpRequest("GET", "https://api.example.com/data")
request.headers["Accept"] = "application/json"

# Send request
with transport:
    response = transport.send(request)
    print(f"Status: {response.status_code}")
    print(f"Content: {response.text}")

Async Transport Usage

import asyncio
from azure.core.pipeline.transport import AioHttpTransport, HttpRequest

async def async_transport_example():
    # Create async transport
    transport = AioHttpTransport()
    
    # Create request
    request = HttpRequest("GET", "https://api.example.com/data")
    
    # Send request
    async with transport:
        response = await transport.send(request) 
        content = await response.text()
        print(f"Status: {response.status_code}")
        print(f"Content: {content}")

# asyncio.run(async_transport_example())

Custom Transport Configuration

import requests
from azure.core.pipeline.transport import RequestsTransport

# Create custom session with specific configuration
session = requests.Session()
session.verify = "/path/to/ca-bundle.pem"  # Custom CA bundle
session.cert = ("/path/to/client.cert", "/path/to/client.key")  # Client cert
session.proxies = {
    "http": "http://proxy.example.com:8080",
    "https": "https://proxy.example.com:8080"
}

# Create transport with custom session
transport = RequestsTransport(
    session=session,
    session_owner=False,  # Don't close session automatically
    connection_timeout=60,
    read_timeout=120
)

Stream Response Handling

from azure.core.pipeline.transport import HttpRequest

def download_large_file(transport, url, filename):
    request = HttpRequest("GET", url)
    
    with transport:
        response = transport.send(request, stream=True)
        
        if response.status_code == 200:
            with open(filename, "wb") as f:
                for chunk in response.iter_bytes(chunk_size=8192):
                    f.write(chunk)
        else:
            print(f"Download failed: {response.status_code}")

JSON Request and Response

import json
from azure.core.pipeline.transport import HttpRequest

def send_json_request(transport, url, data):
    request = HttpRequest("POST", url)
    request.headers["Content-Type"] = "application/json"
    request.set_json_body(data)
    
    with transport:
        response = transport.send(request)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Request failed: {response.status_code}")

# Example usage
data = {"name": "example", "value": 42}
result = send_json_request(transport, "https://api.example.com/items", data)

Multipart Request

from azure.core.pipeline.transport import HttpRequest

def upload_file_multipart(transport, url, file_path):
    with open(file_path, "rb") as f:
        files = {"file": f}
        
        request = HttpRequest("POST", url, files=files)
        
        with transport:
            response = transport.send(request)
            return response.status_code == 200

Async Stream Processing

import asyncio
from azure.core.pipeline.transport import AioHttpTransport, HttpRequest

async def async_stream_download(url, filename):
    transport = AioHttpTransport()
    request = HttpRequest("GET", url)
    
    async with transport:
        response = await transport.send(request)
        
        if response.status_code == 200:
            with open(filename, "wb") as f:
                async for chunk in response.iter_bytes(chunk_size=8192):
                    f.write(chunk)
        else:
            print(f"Download failed: {response.status_code}")

# asyncio.run(async_stream_download("https://example.com/file.zip", "file.zip"))

Transport Selection and Configuration

Automatic Transport Selection

Azure Core automatically selects appropriate transport based on available libraries:

from azure.core.pipeline.transport import RequestsTransport
from azure.core import PipelineClient

# Azure Core will automatically choose the best available transport
client = PipelineClient(base_url="https://api.example.com")

# Or explicitly specify transport
transport = RequestsTransport()
client = PipelineClient(base_url="https://api.example.com", transport=transport)

Connection Pool Configuration

import requests
from azure.core.pipeline.transport import RequestsTransport

# Configure connection pooling
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
    pool_connections=20,  # Number of connection pools
    pool_maxsize=20,      # Maximum connections per pool
    max_retries=3
)
session.mount("http://", adapter)
session.mount("https://", adapter)

transport = RequestsTransport(session=session, session_owner=False)

SSL and Certificate Configuration

import ssl
import aiohttp
from azure.core.pipeline.transport import AioHttpTransport

# Create SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

# Configure aiohttp with custom SSL
connector = aiohttp.TCPConnector(ssl=ssl_context)
session = aiohttp.ClientSession(connector=connector)

transport = AioHttpTransport(session=session, session_owner=False)

Advanced Connection Management

Configuration classes and methods for managing connection lifecycle, pooling, and advanced settings.

from azure.core.configuration import ConnectionConfiguration
from azure.core.pipeline.transport._bigger_block_size_http_adapters import BiggerBlockSizeHTTPAdapter
from typing import Union, Optional, Any

class ConnectionConfiguration:
    """HTTP transport connection configuration settings."""
    def __init__(
        self,
        *,
        connection_timeout: float = 300,
        read_timeout: float = 300,
        connection_verify: Union[bool, str] = True,
        connection_cert: Optional[str] = None,
        connection_data_block_size: int = 4096,
        **kwargs: Any
    ) -> None: ...

class BiggerBlockSizeHTTPAdapter:
    """Custom HTTP adapter that optimizes block size for better performance."""
    def get_connection(self, url, proxies=None): ...

Usage Examples

import requests
from requests.adapters import HTTPAdapter
from azure.core.pipeline.transport import RequestsTransport, AioHttpTransport
import aiohttp
import ssl

# Advanced connection pooling configuration
session = requests.Session()
adapter = HTTPAdapter(
    pool_connections=20,      # Number of connection pools
    pool_maxsize=20,         # Max connections per pool
    max_retries=3
)
session.mount("http://", adapter)
session.mount("https://", adapter)

transport = RequestsTransport(session=session, session_owner=False)

# Advanced SSL configuration for aiohttp
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations("/path/to/ca-bundle.pem")
ssl_context.load_cert_chain("/path/to/client.pem", "/path/to/client.key")

connector = aiohttp.TCPConnector(
    ssl=ssl_context,
    limit=100,              # Total connection pool size
    limit_per_host=20,      # Per-host connection limit
    keepalive_timeout=30    # Keep-alive timeout
)
session = aiohttp.ClientSession(connector=connector)
transport = AioHttpTransport(session=session, session_owner=False)

# Session ownership patterns
# Transport owns session (default) - will close session automatically
transport_owned = RequestsTransport(session_owner=True)

# External session management - must close session manually
external_session = requests.Session()
transport_external = RequestsTransport(session=external_session, session_owner=False)

Advanced Timeout Configuration

Extended timeout handling with per-request overrides.

from azure.core.pipeline.transport import RequestsTransport, HttpRequest

transport = RequestsTransport()
request = HttpRequest("GET", "https://api.example.com/data")

# Per-request timeout override
response = transport.send(
    request,
    connection_timeout=30.0,  # Connection establishment timeout
    read_timeout=60.0         # Read timeout
)

# Environment settings control
transport_with_env = RequestsTransport(use_env_settings=True)   # Default
transport_no_env = RequestsTransport(use_env_settings=False)    # Disable env proxy

Transport-Specific Response Methods

Advanced response handling methods for different transport implementations.

from azure.core.pipeline.transport import AsyncHttpResponse, HttpResponse
from typing import Iterator, AsyncIterator

class AsyncHttpResponse:
    async def load_body(self) -> None:
        """Load response body into memory for sync access."""
        
    def parts(self) -> AsyncIterator["AsyncHttpResponse"]:
        """Parse multipart/mixed responses asynchronously."""

class HttpResponse:
    def stream_download(self, pipeline, **kwargs) -> Iterator[bytes]:
        """Stream download with error handling and decompression control."""
        
    def __getstate__(self):
        """Pickle support - loads body and handles unpicklable objects."""

Usage Examples

from azure.core.pipeline.transport import AioHttpTransport, HttpRequest

async def handle_multipart_response():
    transport = AioHttpTransport()
    request = HttpRequest("GET", "https://api.example.com/multipart")
    
    async with transport:
        response = await transport.send(request)
        
        # Handle multipart response
        async for part in response.parts():
            content = await part.read()
            print(f"Part content: {len(content)} bytes")

# Stream download with progress tracking
def download_with_progress(transport, url, file_path):
    request = HttpRequest("GET", url)
    
    with transport:
        response = transport.send(request)
        
        with open(file_path, "wb") as f:
            total_bytes = 0
            for chunk in response.stream_download(pipeline=None):
                f.write(chunk)
                total_bytes += len(chunk)
                print(f"Downloaded: {total_bytes} bytes")

Best Practices

Transport Management

  • Use context managers (with statements) for proper resource cleanup
  • Share transport instances across multiple requests for connection reuse
  • Configure appropriate timeouts based on your service requirements
  • Use async transports with async code for optimal performance

Connection Configuration

  • Set reasonable connection and read timeouts
  • Configure connection pooling for high-throughput scenarios
  • Use appropriate SSL/TLS settings for security requirements
  • Configure proxy settings when running behind corporate firewalls

Error Handling

  • Handle transport-specific exceptions appropriately
  • Implement retry logic for transient network errors
  • Monitor connection pool exhaustion and adjust sizing accordingly
  • Log transport errors with sufficient detail for debugging

Performance Optimization

  • Use streaming for large payloads to avoid memory issues
  • Implement connection pooling and keep-alive for multiple requests
  • Choose async transports for I/O bound operations
  • Monitor and tune timeout settings based on service characteristics

Install with Tessl CLI

npx tessl i tessl/pypi-azure-core

docs

async-programming-patterns.md

authentication-and-credentials.md

configuration-and-settings.md

distributed-tracing-and-diagnostics.md

error-handling-and-exceptions.md

http-pipeline-and-policies.md

index.md

paging-and-result-iteration.md

polling-and-long-running-operations.md

rest-api-abstraction.md

transport-and-networking.md

utilities-and-helpers.md

tile.json