Microsoft Azure Core Library providing foundational infrastructure for Azure SDK Python clients
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Core provides comprehensive async/await support with full feature parity to synchronous operations. The async infrastructure includes async versions of all core classes, proper resource management, context manager patterns, and performance optimizations for asynchronous operations.
Main async client for Azure services with context manager support and flexible response handling.
from azure.core import AsyncPipelineClient
from azure.core.credentials import AsyncTokenCredential
from azure.core.rest import HttpRequest
from typing import AsyncContextManager, Optional, Any
class AsyncPipelineClient(AsyncContextManager["AsyncPipelineClient"]):
def __init__(
self,
base_url: str,
*,
pipeline: Optional[AsyncPipeline] = None,
config: Optional[Configuration] = None,
**kwargs: Any,
): ...
def send_request(
self,
request: HttpRequest,
*,
stream: bool = False,
**kwargs: Any
) -> Awaitable[AsyncHttpResponse]: ...
async def __aenter__(self) -> "AsyncPipelineClient": ...
async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...
async def close(self) -> None: ...Asynchronous HTTP pipeline with context manager support and async policy execution.
from azure.core.pipeline import AsyncPipeline, AsyncHTTPPolicy
from azure.core.pipeline.transport import AsyncHttpTransport
from typing import AsyncContextManager, Iterable, Union, Optional
class AsyncPipeline(AsyncContextManager["AsyncPipeline"]):
def __init__(
self,
transport: AsyncHttpTransport,
policies: Optional[Iterable[Union[AsyncHTTPPolicy, SansIOHTTPPolicy]]] = None,
): ...
async def run(self, request: HttpRequest, **kwargs: Any) -> PipelineResponse: ...
async def __aenter__(self) -> "AsyncPipeline": ...
async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...from azure.core import AsyncPipelineClient
from azure.core.credentials import AsyncTokenCredential
from azure.core.rest import HttpRequest
import asyncio
async def basic_async_client():
# Create async client with context manager (recommended)
async with AsyncPipelineClient("https://api.example.com") as client:
request = HttpRequest("GET", "/api/data")
response = await client.send_request(request)
# Handle response
response.raise_for_status()
data = response.json()
return data
# Run async function
result = asyncio.run(basic_async_client())async def manual_resource_management():
client = AsyncPipelineClient("https://api.example.com")
try:
# Manual opening not required, but available
await client.__aenter__()
request = HttpRequest("GET", "/api/data")
response = await client.send_request(request)
return response.json()
finally:
# Always close resources
await client.close()Azure Core's unique dual-usage pattern allows responses to be used both as awaitable and context manager:
async def dual_usage_patterns():
async with AsyncPipelineClient("https://api.example.com") as client:
request = HttpRequest("GET", "/api/data")
# Pattern 1: Direct await
response = await client.send_request(request)
data = response.json()
response.close() # Manual cleanup
# Pattern 2: Context manager (automatic cleanup)
async with client.send_request(request) as response:
response.raise_for_status()
data = response.json()
# Response automatically closed when exiting context
return dataProtocol for async token-based authentication with context manager support.
from azure.core.credentials import AsyncTokenCredential, AccessToken
from typing import AsyncContextManager, Optional
class AsyncTokenCredential(AsyncContextManager["AsyncTokenCredential"]):
async def get_token(
self,
*scopes: str,
claims: Optional[str] = None,
tenant_id: Optional[str] = None,
enable_cae: bool = False,
**kwargs: Any,
) -> AccessToken: ...
async def close(self) -> None: ...
async def __aexit__(self, exc_type, exc_value, traceback) -> None: ...from azure.identity.aio import DefaultAzureCredential
from azure.core import AsyncPipelineClient
async def authenticated_client():
# Create async credential
credential = DefaultAzureCredential()
try:
# Create client with authentication
async with AsyncPipelineClient(
"https://api.example.com",
credential=credential
) as client:
request = HttpRequest("GET", "/api/protected-resource")
async with client.send_request(request) as response:
response.raise_for_status()
return response.json()
finally:
# Close credential resources
await credential.close()Async authentication policy with automatic token refresh and challenge handling.
from azure.core.pipeline.policies import AsyncBearerTokenCredentialPolicy
from azure.core.credentials import AsyncTokenCredential
class AsyncBearerTokenCredentialPolicy(AsyncHTTPPolicy):
def __init__(
self,
credential: AsyncTokenCredential,
*scopes: str,
**kwargs: Any
): ...
async def on_request(self, request: PipelineRequest) -> None: ...
async def on_challenge(self, request: PipelineRequest, response: PipelineResponse) -> bool: ...
async def send(self, request: PipelineRequest) -> PipelineResponse: ...Async retry policy with configurable backoff and sleep patterns.
from azure.core.pipeline.policies import AsyncRetryPolicy
class AsyncRetryPolicy(AsyncHTTPPolicy):
def __init__(
self,
*,
retry_total: int = 10,
retry_connect: int = 3,
retry_read: int = 3,
retry_status: int = 3,
retry_backoff_factor: float = 0.8,
retry_backoff_max: int = 120,
**kwargs: Any
): ...
async def sleep(self, settings: Dict[str, Any], transport: AsyncHttpTransport) -> None: ...
async def send(self, request: PipelineRequest) -> PipelineResponse: ...HTTP transport implementation using aiohttp with full async support.
from azure.core.pipeline.transport import AioHttpTransport
import aiohttp
async def custom_transport_example():
# Custom aiohttp session configuration
timeout = aiohttp.ClientTimeout(total=30, connect=10)
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
session = aiohttp.ClientSession(
timeout=timeout,
connector=connector
)
# Create transport with custom session
transport = AioHttpTransport(session=session, session_owner=False)
try:
async with AsyncPipelineClient(
"https://api.example.com",
transport=transport
) as client:
request = HttpRequest("GET", "/api/data")
response = await client.send_request(request)
return response.json()
finally:
await session.close()async def streaming_download():
async with AsyncPipelineClient("https://api.example.com") as client:
request = HttpRequest("GET", "/api/large-file")
# Enable streaming
async with client.send_request(request, stream=True) as response:
response.raise_for_status()
# Stream download with chunks
total_size = 0
async for chunk in response.iter_bytes(chunk_size=8192):
total_size += len(chunk)
process_chunk(chunk)
# Progress reporting
if total_size % (1024 * 1024) == 0: # Every MB
print(f"Downloaded {total_size // (1024 * 1024)} MB")
print(f"Download complete: {total_size} bytes")async def handle_multipart_response():
async with AsyncPipelineClient("https://api.example.com") as client:
request = HttpRequest("GET", "/api/multipart-data")
async with client.send_request(request) as response:
# Iterate over multipart sections
async for part in response.parts():
content_type = part.headers.get("content-type")
content = await part.read()
print(f"Part: {content_type}, Size: {len(content)}")
process_part(content_type, content)from azure.core.async_paging import AsyncItemPaged
async def paginated_data_processing():
# Assume client.list_items() returns AsyncItemPaged
async_pager = client.list_items()
# Item-by-item iteration (recommended for most cases)
processed_count = 0
async for item in async_pager:
await process_item(item)
processed_count += 1
# Progress reporting
if processed_count % 100 == 0:
print(f"Processed {processed_count} items")
return processed_count
async def paginated_batch_processing():
async_pager = client.list_items()
# Page-by-page iteration for batch processing
page_count = 0
async for page in async_pager.by_page():
page_items = []
async for item in page:
page_items.append(item)
# Process entire page as batch
await process_batch(page_items)
page_count += 1
print(f"Processed page {page_count} with {len(page_items)} items")from azure.core.polling import AsyncLROPoller
async def long_running_operation():
async with AsyncPipelineClient("https://api.example.com") as client:
# Start long-running operation
request = HttpRequest("POST", "/api/start-operation", json={"data": "value"})
# Begin polling operation
poller: AsyncLROPoller = await client.begin_long_running_operation(request)
# Wait for completion with custom timeout
try:
result = await poller.result(timeout=300) # 5 minutes
print(f"Operation completed: {result}")
return result
except Exception as e:
print(f"Operation failed or timed out: {e}")
raise
async def polling_with_progress():
poller = await client.begin_operation(request)
# Manual polling with progress updates
while not poller.done():
print("Operation in progress...")
await asyncio.sleep(5) # Check every 5 seconds
result = await poller.result()
return resultfrom azure.core.exceptions import HttpResponseError, ServiceRequestError
import asyncio
async def robust_error_handling():
async with AsyncPipelineClient("https://api.example.com") as client:
try:
request = HttpRequest("GET", "/api/data")
async with client.send_request(request) as response:
response.raise_for_status()
return response.json()
except HttpResponseError as e:
if e.status_code == 429: # Rate limited
retry_after = int(e.response.headers.get("Retry-After", "60"))
print(f"Rate limited, waiting {retry_after} seconds")
await asyncio.sleep(retry_after)
# Implement retry logic
return await retry_request(client, request)
else:
print(f"HTTP error {e.status_code}: {e.message}")
raise
except ServiceRequestError as e:
print(f"Service request error: {e}")
raise
except asyncio.TimeoutError:
print("Request timed out")
raisefrom azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.tracing import SpanKind
@distributed_trace_async(name_of_span="async_data_fetch", kind=SpanKind.CLIENT)
async def fetch_data_with_tracing(client: AsyncPipelineClient, resource_id: str):
"""Automatically traced async function"""
request = HttpRequest("GET", f"/api/resources/{resource_id}")
async with client.send_request(request) as response:
response.raise_for_status()
return response.json()
async def manual_async_tracing():
from azure.core.tracing import AbstractSpan, SpanKind
async with AbstractSpan(name="async_operation", kind=SpanKind.CLIENT) as span:
span.add_attribute("operation.type", "data_processing")
try:
result = await process_async_data()
span.add_attribute("operation.success", True)
span.add_attribute("result.count", len(result))
return result
except Exception as e:
span.add_attribute("error.type", type(e).__name__)
span.add_attribute("error.message", str(e))
raiseimport asyncio
from typing import List
async def concurrent_requests():
async with AsyncPipelineClient("https://api.example.com") as client:
# Create multiple requests
requests = [
HttpRequest("GET", f"/api/resource/{i}")
for i in range(1, 11)
]
# Execute requests concurrently
async def fetch_single(request):
async with client.send_request(request) as response:
response.raise_for_status()
return response.json()
# Use asyncio.gather for concurrent execution
results = await asyncio.gather(
*[fetch_single(req) for req in requests],
return_exceptions=True
)
# Process results
successful_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Request {i+1} failed: {result}")
else:
successful_results.append(result)
return successful_results
async def semaphore_controlled_requests():
# Limit concurrent requests to prevent overwhelming the service
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests
async def controlled_fetch(client, request):
async with semaphore:
async with client.send_request(request) as response:
response.raise_for_status()
return response.json()
async with AsyncPipelineClient("https://api.example.com") as client:
tasks = [controlled_fetch(client, req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
return resultsimport asyncio
from contextvars import ContextVar
# Context variable for request tracking
request_id: ContextVar[str] = ContextVar('request_id')
async def context_aware_processing():
request_id.set("req-12345")
async with AsyncPipelineClient("https://api.example.com") as client:
# Context is preserved across await boundaries
await process_step_1(client)
await process_step_2(client)
await process_step_3(client)
async def process_step_1(client):
# Context variable is available here
current_request_id = request_id.get()
print(f"Processing step 1 for request: {current_request_id}")
request = HttpRequest("GET", "/api/step1")
async with client.send_request(request) as response:
return response.json()import aiohttp
from azure.core.pipeline.transport import AioHttpTransport
async def optimized_client_setup():
# Configure connection pooling for better performance
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30, # Max connections per host
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True,
keepalive_timeout=30
)
timeout = aiohttp.ClientTimeout(
total=60, # Total timeout
connect=10, # Connection timeout
sock_read=30 # Socket read timeout
)
session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
transport = AioHttpTransport(session=session, session_owner=False)
try:
async with AsyncPipelineClient(
"https://api.example.com",
transport=transport
) as client:
# Client now uses optimized connection pooling
return await perform_operations(client)
finally:
await session.close()Complete Async/Await Support: Full feature parity with synchronous operations using async/await patterns.
Dual Usage Pattern: Unique response handling that supports both direct awaiting and context manager usage.
Context Manager Integration: Automatic resource management with async context managers throughout the stack.
Concurrent Operations: Built-in support for concurrent requests with proper resource management.
Async Authentication: Full async credential support with automatic token refresh and context management.
Streaming Support: Efficient async streaming for large responses with chunked processing.
Error Recovery: Robust error handling with async-compatible retry policies and exception management.
Performance Optimized: Connection pooling, keep-alive support, and efficient resource utilization.
The async programming patterns in Azure Core provide a complete, type-safe, and efficient foundation for building high-performance asynchronous Azure applications while maintaining consistency with synchronous operation patterns.
Install with Tessl CLI
npx tessl i tessl/pypi-azure-coredocs