ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Async timeout context managers for controlling operation timeouts in asynchronous code. This module provides vendored timeout functionality that works reliably across different Python versions and asyncio implementations.
Async context manager for implementing timeouts in asyncio code, providing reliable timeout behavior with proper cancellation handling.
class timeout:
"""Async timeout context manager (vendored from async-timeout)."""
def __init__(self, timeout, *, loop=None):
"""
Initialize timeout context manager.
Parameters:
- timeout: float or None, timeout duration in seconds (None disables timeout)
- loop: asyncio.AbstractEventLoop, event loop to use (optional, uses current loop)
"""
def __enter__(self):
"""
Sync context manager entry (not recommended, use async version).
Returns:
self
"""
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Sync context manager exit (not recommended, use async version).
Parameters:
- exc_type: Exception type if exception occurred
- exc_val: Exception value if exception occurred
- exc_tb: Exception traceback if exception occurred
"""
async def __aenter__(self):
"""
Async context manager entry.
Returns:
self
"""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Async context manager exit with cancellation handling.
Parameters:
- exc_type: Exception type if exception occurred
- exc_val: Exception value if exception occurred
- exc_tb: Exception traceback if exception occurred
Returns:
bool: True if timeout occurred and should be handled
"""
expired: bool # Boolean indicating if timeout occurred
remaining: float # Optional float of remaining time until timeoutfrom asgiref.timeout import timeout
import asyncio
async def basic_timeout_example():
"""Demonstrate basic timeout usage."""
async def slow_operation():
"""Simulate a slow async operation."""
await asyncio.sleep(2.0)
return "Operation completed"
# Operation with timeout
try:
async with timeout(1.0): # 1 second timeout
result = await slow_operation()
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out!")
# Operation without timeout
try:
async with timeout(3.0): # 3 second timeout (longer than operation)
result = await slow_operation()
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out!")
# asyncio.run(basic_timeout_example())from asgiref.timeout import timeout
import asyncio
import aiohttp
async def fetch_with_timeout(url, timeout_seconds=5.0):
"""Fetch URL with timeout."""
async with aiohttp.ClientSession() as session:
try:
async with timeout(timeout_seconds):
async with session.get(url) as response:
return await response.text()
except asyncio.TimeoutError:
return f"Request to {url} timed out after {timeout_seconds}s"
async def http_timeout_example():
"""Demonstrate HTTP requests with timeout."""
# Fast request (should succeed)
result1 = await fetch_with_timeout("https://httpbin.org/delay/1", timeout_seconds=3.0)
print(f"Fast request: {len(result1)} characters")
# Slow request (should timeout)
result2 = await fetch_with_timeout("https://httpbin.org/delay/10", timeout_seconds=2.0)
print(f"Slow request: {result2}")
# asyncio.run(http_timeout_example())from asgiref.timeout import timeout
import asyncio
class DatabaseConnection:
"""Mock database connection for demonstration."""
async def execute_query(self, query, delay=1.0):
"""Execute database query with simulated delay."""
await asyncio.sleep(delay)
return f"Result for: {query}"
async def transaction(self, operations, delay=0.5):
"""Execute multiple operations in transaction."""
results = []
for op in operations:
await asyncio.sleep(delay)
results.append(f"Executed: {op}")
return results
async def database_timeout_example():
"""Demonstrate database operations with timeout."""
db = DatabaseConnection()
# Quick query with timeout
try:
async with timeout(2.0):
result = await db.execute_query("SELECT * FROM users", delay=0.5)
print(f"Quick query: {result}")
except asyncio.TimeoutError:
print("Quick query timed out")
# Slow query with timeout
try:
async with timeout(1.0):
result = await db.execute_query("SELECT * FROM big_table", delay=3.0)
print(f"Slow query: {result}")
except asyncio.TimeoutError:
print("Slow query timed out")
# Transaction with timeout
try:
async with timeout(3.0):
operations = ["INSERT INTO logs", "UPDATE counters", "DELETE old_data"]
results = await db.transaction(operations, delay=0.8)
print(f"Transaction results: {results}")
except asyncio.TimeoutError:
print("Transaction timed out")
# asyncio.run(database_timeout_example())from asgiref.timeout import timeout
import asyncio
async def monitored_operation_with_timeout():
"""Demonstrate timeout with progress monitoring."""
async def long_running_task():
"""Task that reports progress."""
for i in range(10):
print(f"Progress: {i+1}/10")
await asyncio.sleep(0.5)
return "Task completed"
# Monitor progress with timeout
timeout_manager = timeout(3.0) # 3 second timeout
try:
async with timeout_manager:
result = await long_running_task()
print(f"Final result: {result}")
except asyncio.TimeoutError:
print(f"Task timed out! Expired: {timeout_manager.expired}")
if hasattr(timeout_manager, 'remaining'):
print(f"Remaining time when cancelled: {timeout_manager.remaining}")
# asyncio.run(monitored_operation_with_timeout())from asgiref.timeout import timeout
import asyncio
async def shared_timeout_example():
"""Demonstrate multiple operations sharing a timeout."""
async def operation_a():
await asyncio.sleep(1.0)
return "Operation A done"
async def operation_b():
await asyncio.sleep(1.5)
return "Operation B done"
async def operation_c():
await asyncio.sleep(2.0)
return "Operation C done"
# All operations must complete within shared timeout
try:
async with timeout(3.0): # 3 second total timeout
result_a = await operation_a()
print(result_a)
result_b = await operation_b()
print(result_b)
result_c = await operation_c() # This might timeout
print(result_c)
except asyncio.TimeoutError:
print("One or more operations timed out")
# asyncio.run(shared_timeout_example())from asgiref.timeout import timeout
import asyncio
async def timeout_middleware_app(scope, receive, send):
"""ASGI application with timeout middleware."""
async def handle_request():
"""Handle the request with potential delay."""
# Simulate processing time based on path
if scope['path'] == '/slow':
await asyncio.sleep(3.0)
body = b'Slow response'
elif scope['path'] == '/fast':
await asyncio.sleep(0.1)
body = b'Fast response'
else:
body = b'Default response'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': body,
})
# Apply timeout to request handling
try:
async with timeout(2.0): # 2 second timeout for all requests
await handle_request()
except asyncio.TimeoutError:
# Send timeout response
await send({
'type': 'http.response.start',
'status': 504,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Request timeout',
})
async def test_timeout_middleware():
"""Test the timeout middleware."""
from asgiref.testing import ApplicationCommunicator
# Test fast request (should succeed)
fast_scope = {
'type': 'http',
'method': 'GET',
'path': '/fast',
}
communicator = ApplicationCommunicator(timeout_middleware_app, fast_scope)
try:
await communicator.send_input({'type': 'http.request', 'body': b''})
response_start = await communicator.receive_output()
print(f"Fast request status: {response_start['status']}")
finally:
await communicator.stop()
# Test slow request (should timeout)
slow_scope = {
'type': 'http',
'method': 'GET',
'path': '/slow',
}
communicator = ApplicationCommunicator(timeout_middleware_app, slow_scope)
try:
await communicator.send_input({'type': 'http.request', 'body': b''})
response_start = await communicator.receive_output()
print(f"Slow request status: {response_start['status']}")
finally:
await communicator.stop()
# asyncio.run(test_timeout_middleware())from asgiref.timeout import timeout
import asyncio
async def conditional_timeout_example():
"""Demonstrate conditional timeout usage."""
async def process_data(data, use_timeout=True):
"""Process data with optional timeout."""
async def processing_work():
# Simulate work based on data size
work_time = len(data) * 0.1
await asyncio.sleep(work_time)
return f"Processed {len(data)} items"
# Use timeout conditionally
if use_timeout:
try:
async with timeout(1.0):
return await processing_work()
except asyncio.TimeoutError:
return "Processing timed out"
else:
return await processing_work()
# Test with small data (should succeed)
small_data = list(range(5))
result1 = await process_data(small_data, use_timeout=True)
print(f"Small data: {result1}")
# Test with large data and timeout (should timeout)
large_data = list(range(20))
result2 = await process_data(large_data, use_timeout=True)
print(f"Large data with timeout: {result2}")
# Test with large data without timeout (should succeed)
result3 = await process_data(large_data, use_timeout=False)
print(f"Large data without timeout: {result3}")
# asyncio.run(conditional_timeout_example())from asgiref.timeout import timeout
import asyncio
class ResourceManager:
"""Resource manager that needs cleanup on timeout."""
def __init__(self):
self.resources = []
async def acquire_resource(self, resource_id):
"""Acquire a resource."""
await asyncio.sleep(0.1) # Simulate acquisition time
self.resources.append(resource_id)
print(f"Acquired resource: {resource_id}")
async def release_all(self):
"""Release all acquired resources."""
for resource_id in self.resources:
print(f"Releasing resource: {resource_id}")
await asyncio.sleep(0.05)
self.resources.clear()
async def timeout_with_cleanup_example():
"""Demonstrate proper cleanup when timeout occurs."""
resource_manager = ResourceManager()
try:
async with timeout(1.0): # 1 second timeout
# Acquire multiple resources
for i in range(10):
await resource_manager.acquire_resource(f"resource_{i}")
await asyncio.sleep(0.2) # This will cause timeout
print("All resources acquired successfully")
except asyncio.TimeoutError:
print("Operation timed out, cleaning up resources...")
finally:
# Always clean up resources
await resource_manager.release_all()
# asyncio.run(timeout_with_cleanup_example())The timeout context manager provides:
Install with Tessl CLI
npx tessl i tessl/pypi-asgiref