CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-core

Microsoft Azure Core Library providing foundational infrastructure for Azure SDK Python clients

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-programming-patterns.mddocs/

Async Programming Patterns

Azure Core provides comprehensive async/await support with full feature parity to synchronous operations. The async infrastructure includes async versions of all core classes, proper resource management, context manager patterns, and performance optimizations for asynchronous operations.

Core Async Components

AsyncPipelineClient

Main async client for Azure services with context manager support and flexible response handling.

from azure.core import AsyncPipelineClient
from azure.core.credentials import AsyncTokenCredential
from azure.core.rest import HttpRequest
from typing import AsyncContextManager, Optional, Any

class AsyncPipelineClient(AsyncContextManager["AsyncPipelineClient"]):
    def __init__(
        self,
        base_url: str,
        *,
        pipeline: Optional[AsyncPipeline] = None,
        config: Optional[Configuration] = None,
        **kwargs: Any,
    ): ...
    
    def send_request(
        self,
        request: HttpRequest,
        *,
        stream: bool = False,
        **kwargs: Any
    ) -> Awaitable[AsyncHttpResponse]: ...
    
    async def __aenter__(self) -> "AsyncPipelineClient": ...
    async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...
    async def close(self) -> None: ...

AsyncPipeline

Asynchronous HTTP pipeline with context manager support and async policy execution.

from azure.core.pipeline import AsyncPipeline, AsyncHTTPPolicy
from azure.core.pipeline.transport import AsyncHttpTransport
from typing import AsyncContextManager, Iterable, Union, Optional

class AsyncPipeline(AsyncContextManager["AsyncPipeline"]):
    def __init__(
        self,
        transport: AsyncHttpTransport,
        policies: Optional[Iterable[Union[AsyncHTTPPolicy, SansIOHTTPPolicy]]] = None,
    ): ...
    
    async def run(self, request: HttpRequest, **kwargs: Any) -> PipelineResponse: ...
    
    async def __aenter__(self) -> "AsyncPipeline": ...
    async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...

Basic Async Usage

Client Creation and Usage

from azure.core import AsyncPipelineClient
from azure.core.credentials import AsyncTokenCredential
from azure.core.rest import HttpRequest
import asyncio

async def basic_async_client():
    # Create async client with context manager (recommended)
    async with AsyncPipelineClient("https://api.example.com") as client:
        request = HttpRequest("GET", "/api/data")
        response = await client.send_request(request)
        
        # Handle response
        response.raise_for_status()
        data = response.json()
        return data

# Run async function
result = asyncio.run(basic_async_client())

Manual Resource Management

async def manual_resource_management():
    client = AsyncPipelineClient("https://api.example.com")
    
    try:
        # Manual opening not required, but available
        await client.__aenter__()
        
        request = HttpRequest("GET", "/api/data")
        response = await client.send_request(request)
        
        return response.json()
    finally:
        # Always close resources
        await client.close()

Dual Usage Response Pattern

Azure Core's unique dual-usage pattern allows responses to be used both as awaitable and context manager:

async def dual_usage_patterns():
    async with AsyncPipelineClient("https://api.example.com") as client:
        request = HttpRequest("GET", "/api/data")
        
        # Pattern 1: Direct await
        response = await client.send_request(request)
        data = response.json()
        response.close()  # Manual cleanup
        
        # Pattern 2: Context manager (automatic cleanup)
        async with client.send_request(request) as response:
            response.raise_for_status()
            data = response.json()
            # Response automatically closed when exiting context
        
        return data

Async Authentication

AsyncTokenCredential

Protocol for async token-based authentication with context manager support.

from azure.core.credentials import AsyncTokenCredential, AccessToken
from typing import AsyncContextManager, Optional

class AsyncTokenCredential(AsyncContextManager["AsyncTokenCredential"]):
    async def get_token(
        self,
        *scopes: str,
        claims: Optional[str] = None,
        tenant_id: Optional[str] = None,
        enable_cae: bool = False,
        **kwargs: Any,
    ) -> AccessToken: ...
    
    async def close(self) -> None: ...
    async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...

Usage with Authentication

from azure.identity.aio import DefaultAzureCredential
from azure.core import AsyncPipelineClient

async def authenticated_client():
    # Create async credential
    credential = DefaultAzureCredential()
    
    try:
        # Create client with authentication
        async with AsyncPipelineClient(
            "https://api.example.com",
            credential=credential
        ) as client:
            request = HttpRequest("GET", "/api/protected-resource")
            
            async with client.send_request(request) as response:
                response.raise_for_status()
                return response.json()
    finally:
        # Close credential resources
        await credential.close()

Async Pipeline Policies

AsyncBearerTokenCredentialPolicy

Async authentication policy with automatic token refresh and challenge handling.

from azure.core.pipeline.policies import AsyncBearerTokenCredentialPolicy
from azure.core.credentials import AsyncTokenCredential

class AsyncBearerTokenCredentialPolicy(AsyncHTTPPolicy):
    def __init__(
        self,
        credential: AsyncTokenCredential,
        *scopes: str,
        **kwargs: Any
    ): ...
    
    async def on_request(self, request: PipelineRequest) -> None: ...
    async def on_challenge(self, request: PipelineRequest, response: PipelineResponse) -> bool: ...
    async def send(self, request: PipelineRequest) -> PipelineResponse: ...

AsyncRetryPolicy

Async retry policy with configurable backoff and sleep patterns.

from azure.core.pipeline.policies import AsyncRetryPolicy

class AsyncRetryPolicy(AsyncHTTPPolicy):
    def __init__(
        self,
        *,
        retry_total: int = 10,
        retry_connect: int = 3,
        retry_read: int = 3,
        retry_status: int = 3,
        retry_backoff_factor: float = 0.8,
        retry_backoff_max: int = 120,
        **kwargs: Any
    ): ...
    
    async def sleep(self, settings: Dict[str, Any], transport: AsyncHttpTransport) -> None: ...
    async def send(self, request: PipelineRequest) -> PipelineResponse: ...

Async Transport

AioHttpTransport

HTTP transport implementation using aiohttp with full async support.

from azure.core.pipeline.transport import AioHttpTransport
import aiohttp

async def custom_transport_example():
    # Custom aiohttp session configuration
    timeout = aiohttp.ClientTimeout(total=30, connect=10)
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
    
    session = aiohttp.ClientSession(
        timeout=timeout,
        connector=connector
    )
    
    # Create transport with custom session
    transport = AioHttpTransport(session=session, session_owner=False)
    
    try:
        async with AsyncPipelineClient(
            "https://api.example.com",
            transport=transport
        ) as client:
            request = HttpRequest("GET", "/api/data")
            response = await client.send_request(request)
            return response.json()
    finally:
        await session.close()

Async Streaming

Streaming Responses

async def streaming_download():
    async with AsyncPipelineClient("https://api.example.com") as client:
        request = HttpRequest("GET", "/api/large-file")
        
        # Enable streaming
        async with client.send_request(request, stream=True) as response:
            response.raise_for_status()
            
            # Stream download with chunks
            total_size = 0
            async for chunk in response.iter_bytes(chunk_size=8192):
                total_size += len(chunk)
                process_chunk(chunk)
                
                # Progress reporting
                if total_size % (1024 * 1024) == 0:  # Every MB
                    print(f"Downloaded {total_size // (1024 * 1024)} MB")
            
            print(f"Download complete: {total_size} bytes")

Multipart Responses

async def handle_multipart_response():
    async with AsyncPipelineClient("https://api.example.com") as client:
        request = HttpRequest("GET", "/api/multipart-data")
        
        async with client.send_request(request) as response:
            # Iterate over multipart sections
            async for part in response.parts():
                content_type = part.headers.get("content-type")
                content = await part.read()
                
                print(f"Part: {content_type}, Size: {len(content)}")
                process_part(content_type, content)

Async Pagination

ItemPaged Async Usage

from azure.core.async_paging import AsyncItemPaged

async def paginated_data_processing():
    # Assume client.list_items() returns AsyncItemPaged
    async_pager = client.list_items()
    
    # Item-by-item iteration (recommended for most cases)
    processed_count = 0
    async for item in async_pager:
        await process_item(item)
        processed_count += 1
        
        # Progress reporting
        if processed_count % 100 == 0:
            print(f"Processed {processed_count} items")
    
    return processed_count

async def paginated_batch_processing():
    async_pager = client.list_items()
    
    # Page-by-page iteration for batch processing
    page_count = 0
    async for page in async_pager.by_page():
        page_items = []
        async for item in page:
            page_items.append(item)
        
        # Process entire page as batch
        await process_batch(page_items)
        page_count += 1
        print(f"Processed page {page_count} with {len(page_items)} items")

Async Long-Running Operations

AsyncLROPoller

from azure.core.polling import AsyncLROPoller

async def long_running_operation():
    async with AsyncPipelineClient("https://api.example.com") as client:
        # Start long-running operation
        request = HttpRequest("POST", "/api/start-operation", json={"data": "value"})
        
        # Begin polling operation
        poller: AsyncLROPoller = await client.begin_long_running_operation(request)
        
        # Wait for completion with custom timeout
        try:
            result = await poller.result(timeout=300)  # 5 minutes
            print(f"Operation completed: {result}")
            return result
        except Exception as e:
            print(f"Operation failed or timed out: {e}")
            raise

async def polling_with_progress():
    poller = await client.begin_operation(request)
    
    # Manual polling with progress updates
    while not poller.done():
        print("Operation in progress...")
        await asyncio.sleep(5)  # Check every 5 seconds
    
    result = await poller.result()
    return result

Async Error Handling

Exception Handling Patterns

from azure.core.exceptions import HttpResponseError, ServiceRequestError
import asyncio

async def robust_error_handling():
    async with AsyncPipelineClient("https://api.example.com") as client:
        try:
            request = HttpRequest("GET", "/api/data")
            
            async with client.send_request(request) as response:
                response.raise_for_status()
                return response.json()
                
        except HttpResponseError as e:
            if e.status_code == 429:  # Rate limited
                retry_after = int(e.response.headers.get("Retry-After", "60"))
                print(f"Rate limited, waiting {retry_after} seconds")
                await asyncio.sleep(retry_after)
                # Implement retry logic
                return await retry_request(client, request)
            else:
                print(f"HTTP error {e.status_code}: {e.message}")
                raise
        except ServiceRequestError as e:
            print(f"Service request error: {e}")
            raise
        except asyncio.TimeoutError:
            print("Request timed out")
            raise

Async Tracing

Distributed Tracing with Async

from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.tracing import SpanKind

@distributed_trace_async(name_of_span="async_data_fetch", kind=SpanKind.CLIENT)
async def fetch_data_with_tracing(client: AsyncPipelineClient, resource_id: str):
    """Automatically traced async function"""
    request = HttpRequest("GET", f"/api/resources/{resource_id}")
    
    async with client.send_request(request) as response:
        response.raise_for_status()
        return response.json()

async def manual_async_tracing():
    from azure.core.tracing import AbstractSpan, SpanKind
    
    async with AbstractSpan(name="async_operation", kind=SpanKind.CLIENT) as span:
        span.add_attribute("operation.type", "data_processing")
        
        try:
            result = await process_async_data()
            span.add_attribute("operation.success", True)
            span.add_attribute("result.count", len(result))
            return result
        except Exception as e:
            span.add_attribute("error.type", type(e).__name__)
            span.add_attribute("error.message", str(e))
            raise

Advanced Async Patterns

Concurrent Operations

import asyncio
from typing import List

async def concurrent_requests():
    async with AsyncPipelineClient("https://api.example.com") as client:
        # Create multiple requests
        requests = [
            HttpRequest("GET", f"/api/resource/{i}")
            for i in range(1, 11)
        ]
        
        # Execute requests concurrently
        async def fetch_single(request):
            async with client.send_request(request) as response:
                response.raise_for_status()
                return response.json()
        
        # Use asyncio.gather for concurrent execution
        results = await asyncio.gather(
            *[fetch_single(req) for req in requests],
            return_exceptions=True
        )
        
        # Process results
        successful_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Request {i+1} failed: {result}")
            else:
                successful_results.append(result)
        
        return successful_results

async def semaphore_controlled_requests():
    # Limit concurrent requests to prevent overwhelming the service
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent requests
    
    async def controlled_fetch(client, request):
        async with semaphore:
            async with client.send_request(request) as response:
                response.raise_for_status()
                return response.json()
    
    async with AsyncPipelineClient("https://api.example.com") as client:
        tasks = [controlled_fetch(client, req) for req in requests]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

Context Preservation

import asyncio
from contextvars import ContextVar

# Context variable for request tracking
request_id: ContextVar[str] = ContextVar('request_id')

async def context_aware_processing():
    request_id.set("req-12345")
    
    async with AsyncPipelineClient("https://api.example.com") as client:
        # Context is preserved across await boundaries
        await process_step_1(client)
        await process_step_2(client)
        await process_step_3(client)

async def process_step_1(client):
    # Context variable is available here
    current_request_id = request_id.get()
    print(f"Processing step 1 for request: {current_request_id}")
    
    request = HttpRequest("GET", "/api/step1")
    async with client.send_request(request) as response:
        return response.json()

Performance Optimization

Connection Pooling

import aiohttp
from azure.core.pipeline.transport import AioHttpTransport

async def optimized_client_setup():
    # Configure connection pooling for better performance
    connector = aiohttp.TCPConnector(
        limit=100,           # Total connection pool size
        limit_per_host=30,   # Max connections per host
        ttl_dns_cache=300,   # DNS cache TTL
        use_dns_cache=True,
        keepalive_timeout=30
    )
    
    timeout = aiohttp.ClientTimeout(
        total=60,      # Total timeout
        connect=10,    # Connection timeout
        sock_read=30   # Socket read timeout
    )
    
    session = aiohttp.ClientSession(
        connector=connector,
        timeout=timeout
    )
    
    transport = AioHttpTransport(session=session, session_owner=False)
    
    try:
        async with AsyncPipelineClient(
            "https://api.example.com",
            transport=transport
        ) as client:
            # Client now uses optimized connection pooling
            return await perform_operations(client)
    finally:
        await session.close()

Key Features

Complete Async/Await Support: Full feature parity with synchronous operations using async/await patterns.

Dual Usage Pattern: Unique response handling that supports both direct awaiting and context manager usage.

Context Manager Integration: Automatic resource management with async context managers throughout the stack.

Concurrent Operations: Built-in support for concurrent requests with proper resource management.

Async Authentication: Full async credential support with automatic token refresh and context management.

Streaming Support: Efficient async streaming for large responses with chunked processing.

Error Recovery: Robust error handling with async-compatible retry policies and exception management.

Performance Optimized: Connection pooling, keep-alive support, and efficient resource utilization.

The async programming patterns in Azure Core provide a complete, type-safe, and efficient foundation for building high-performance asynchronous Azure applications while maintaining consistency with synchronous operation patterns.

Install with Tessl CLI

npx tessl i tessl/pypi-azure-core

docs

async-programming-patterns.md

authentication-and-credentials.md

configuration-and-settings.md

distributed-tracing-and-diagnostics.md

error-handling-and-exceptions.md

http-pipeline-and-policies.md

index.md

paging-and-result-iteration.md

polling-and-long-running-operations.md

rest-api-abstraction.md

transport-and-networking.md

utilities-and-helpers.md

tile.json