CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asgiref

ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

timeout.mddocs/

Timeout Management

Async timeout context managers for controlling operation timeouts in asynchronous code. This module provides vendored timeout functionality that works reliably across different Python versions and asyncio implementations.

Capabilities

Timeout Context Manager

Async context manager for implementing timeouts in asyncio code, providing reliable timeout behavior with proper cancellation handling.

class timeout:
    """Async timeout context manager (vendored from async-timeout)."""
    
    def __init__(self, timeout, *, loop=None):
        """
        Initialize timeout context manager.
        
        Parameters:
        - timeout: float or None, timeout duration in seconds (None disables timeout)
        - loop: asyncio.AbstractEventLoop, event loop to use (optional, uses current loop)
        """
    
    def __enter__(self):
        """
        Sync context manager entry (not recommended, use async version).
        
        Returns:
        self
        """
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Sync context manager exit (not recommended, use async version).
        
        Parameters:
        - exc_type: Exception type if exception occurred
        - exc_val: Exception value if exception occurred  
        - exc_tb: Exception traceback if exception occurred
        """
    
    async def __aenter__(self):
        """
        Async context manager entry.
        
        Returns:
        self
        """
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        Async context manager exit with cancellation handling.
        
        Parameters:
        - exc_type: Exception type if exception occurred
        - exc_val: Exception value if exception occurred
        - exc_tb: Exception traceback if exception occurred
        
        Returns:
        bool: True if timeout occurred and should be handled
        """
    
    expired: bool         # Boolean indicating if timeout occurred
    remaining: float      # Optional float of remaining time until timeout

Usage Examples

Basic Timeout Usage

from asgiref.timeout import timeout
import asyncio

async def basic_timeout_example():
    """Demonstrate basic timeout usage."""
    
    async def slow_operation():
        """Simulate a slow async operation."""
        await asyncio.sleep(2.0)
        return "Operation completed"
    
    # Operation with timeout
    try:
        async with timeout(1.0):  # 1 second timeout
            result = await slow_operation()
            print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Operation timed out!")
    
    # Operation without timeout
    try:
        async with timeout(3.0):  # 3 second timeout (longer than operation)
            result = await slow_operation()
            print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Operation timed out!")

# asyncio.run(basic_timeout_example())

HTTP Request with Timeout

from asgiref.timeout import timeout
import asyncio
import aiohttp

async def fetch_with_timeout(url, timeout_seconds=5.0):
    """Fetch URL with timeout."""
    
    async with aiohttp.ClientSession() as session:
        try:
            async with timeout(timeout_seconds):
                async with session.get(url) as response:
                    return await response.text()
        except asyncio.TimeoutError:
            return f"Request to {url} timed out after {timeout_seconds}s"

async def http_timeout_example():
    """Demonstrate HTTP requests with timeout."""
    
    # Fast request (should succeed)
    result1 = await fetch_with_timeout("https://httpbin.org/delay/1", timeout_seconds=3.0)
    print(f"Fast request: {len(result1)} characters")
    
    # Slow request (should timeout)
    result2 = await fetch_with_timeout("https://httpbin.org/delay/10", timeout_seconds=2.0)
    print(f"Slow request: {result2}")

# asyncio.run(http_timeout_example())

Database Operation Timeout

from asgiref.timeout import timeout
import asyncio

class DatabaseConnection:
    """Mock database connection for demonstration."""
    
    async def execute_query(self, query, delay=1.0):
        """Execute database query with simulated delay."""
        await asyncio.sleep(delay)
        return f"Result for: {query}"
    
    async def transaction(self, operations, delay=0.5):
        """Execute multiple operations in transaction."""
        results = []
        for op in operations:
            await asyncio.sleep(delay)
            results.append(f"Executed: {op}")
        return results

async def database_timeout_example():
    """Demonstrate database operations with timeout."""
    db = DatabaseConnection()
    
    # Quick query with timeout
    try:
        async with timeout(2.0):
            result = await db.execute_query("SELECT * FROM users", delay=0.5)
            print(f"Quick query: {result}")
    except asyncio.TimeoutError:
        print("Quick query timed out")
    
    # Slow query with timeout
    try:
        async with timeout(1.0):
            result = await db.execute_query("SELECT * FROM big_table", delay=3.0)
            print(f"Slow query: {result}")
    except asyncio.TimeoutError:
        print("Slow query timed out")
    
    # Transaction with timeout
    try:
        async with timeout(3.0):
            operations = ["INSERT INTO logs", "UPDATE counters", "DELETE old_data"]
            results = await db.transaction(operations, delay=0.8)
            print(f"Transaction results: {results}")
    except asyncio.TimeoutError:
        print("Transaction timed out")

# asyncio.run(database_timeout_example())

Timeout with Progress Monitoring

from asgiref.timeout import timeout
import asyncio

async def monitored_operation_with_timeout():
    """Demonstrate timeout with progress monitoring."""
    
    async def long_running_task():
        """Task that reports progress."""
        for i in range(10):
            print(f"Progress: {i+1}/10")
            await asyncio.sleep(0.5)
        return "Task completed"
    
    # Monitor progress with timeout
    timeout_manager = timeout(3.0)  # 3 second timeout
    
    try:
        async with timeout_manager:
            result = await long_running_task()
            print(f"Final result: {result}")
    except asyncio.TimeoutError:
        print(f"Task timed out! Expired: {timeout_manager.expired}")
        if hasattr(timeout_manager, 'remaining'):
            print(f"Remaining time when cancelled: {timeout_manager.remaining}")

# asyncio.run(monitored_operation_with_timeout())

Multiple Operations with Shared Timeout

from asgiref.timeout import timeout
import asyncio

async def shared_timeout_example():
    """Demonstrate multiple operations sharing a timeout."""
    
    async def operation_a():
        await asyncio.sleep(1.0)
        return "Operation A done"
    
    async def operation_b():
        await asyncio.sleep(1.5)
        return "Operation B done"
    
    async def operation_c():
        await asyncio.sleep(2.0)
        return "Operation C done"
    
    # All operations must complete within shared timeout
    try:
        async with timeout(3.0):  # 3 second total timeout
            result_a = await operation_a()
            print(result_a)
            
            result_b = await operation_b()
            print(result_b)
            
            result_c = await operation_c()  # This might timeout
            print(result_c)
            
    except asyncio.TimeoutError:
        print("One or more operations timed out")

# asyncio.run(shared_timeout_example())

Timeout in ASGI Applications

from asgiref.timeout import timeout
import asyncio

async def timeout_middleware_app(scope, receive, send):
    """ASGI application with timeout middleware."""
    
    async def handle_request():
        """Handle the request with potential delay."""
        # Simulate processing time based on path
        if scope['path'] == '/slow':
            await asyncio.sleep(3.0)
            body = b'Slow response'
        elif scope['path'] == '/fast':
            await asyncio.sleep(0.1)
            body = b'Fast response'
        else:
            body = b'Default response'
        
        await send({
            'type': 'http.response.start',
            'status': 200,
            'headers': [[b'content-type', b'text/plain']],
        })
        await send({
            'type': 'http.response.body',
            'body': body,
        })
    
    # Apply timeout to request handling
    try:
        async with timeout(2.0):  # 2 second timeout for all requests
            await handle_request()
    except asyncio.TimeoutError:
        # Send timeout response
        await send({
            'type': 'http.response.start',
            'status': 504,
            'headers': [[b'content-type', b'text/plain']],
        })
        await send({
            'type': 'http.response.body',
            'body': b'Request timeout',
        })

async def test_timeout_middleware():
    """Test the timeout middleware."""
    from asgiref.testing import ApplicationCommunicator
    
    # Test fast request (should succeed)
    fast_scope = {
        'type': 'http',
        'method': 'GET',
        'path': '/fast',
    }
    
    communicator = ApplicationCommunicator(timeout_middleware_app, fast_scope)
    try:
        await communicator.send_input({'type': 'http.request', 'body': b''})
        response_start = await communicator.receive_output()
        print(f"Fast request status: {response_start['status']}")
    finally:
        await communicator.stop()
    
    # Test slow request (should timeout)
    slow_scope = {
        'type': 'http',
        'method': 'GET',
        'path': '/slow',
    }
    
    communicator = ApplicationCommunicator(timeout_middleware_app, slow_scope)
    try:
        await communicator.send_input({'type': 'http.request', 'body': b''})
        response_start = await communicator.receive_output()
        print(f"Slow request status: {response_start['status']}")
    finally:
        await communicator.stop()

# asyncio.run(test_timeout_middleware())

Conditional Timeout

from asgiref.timeout import timeout
import asyncio

async def conditional_timeout_example():
    """Demonstrate conditional timeout usage."""
    
    async def process_data(data, use_timeout=True):
        """Process data with optional timeout."""
        
        async def processing_work():
            # Simulate work based on data size
            work_time = len(data) * 0.1
            await asyncio.sleep(work_time)
            return f"Processed {len(data)} items"
        
        # Use timeout conditionally
        if use_timeout:
            try:
                async with timeout(1.0):
                    return await processing_work()
            except asyncio.TimeoutError:
                return "Processing timed out"
        else:
            return await processing_work()
    
    # Test with small data (should succeed)
    small_data = list(range(5))
    result1 = await process_data(small_data, use_timeout=True)
    print(f"Small data: {result1}")
    
    # Test with large data and timeout (should timeout)
    large_data = list(range(20))
    result2 = await process_data(large_data, use_timeout=True)
    print(f"Large data with timeout: {result2}")
    
    # Test with large data without timeout (should succeed)
    result3 = await process_data(large_data, use_timeout=False)
    print(f"Large data without timeout: {result3}")

# asyncio.run(conditional_timeout_example())

Timeout with Cleanup

from asgiref.timeout import timeout
import asyncio

class ResourceManager:
    """Resource manager that needs cleanup on timeout."""
    
    def __init__(self):
        self.resources = []
    
    async def acquire_resource(self, resource_id):
        """Acquire a resource."""
        await asyncio.sleep(0.1)  # Simulate acquisition time
        self.resources.append(resource_id)
        print(f"Acquired resource: {resource_id}")
    
    async def release_all(self):
        """Release all acquired resources."""
        for resource_id in self.resources:
            print(f"Releasing resource: {resource_id}")
            await asyncio.sleep(0.05)
        self.resources.clear()

async def timeout_with_cleanup_example():
    """Demonstrate proper cleanup when timeout occurs."""
    
    resource_manager = ResourceManager()
    
    try:
        async with timeout(1.0):  # 1 second timeout
            # Acquire multiple resources
            for i in range(10):
                await resource_manager.acquire_resource(f"resource_{i}")
                await asyncio.sleep(0.2)  # This will cause timeout
            
            print("All resources acquired successfully")
            
    except asyncio.TimeoutError:
        print("Operation timed out, cleaning up resources...")
    finally:
        # Always clean up resources
        await resource_manager.release_all()

# asyncio.run(timeout_with_cleanup_example())

Key Features

The timeout context manager provides:

  • Reliable Cancellation: Proper handling of asyncio task cancellation
  • Flexible Timeout Values: Support for None (no timeout) and float values
  • State Tracking: Access to expired status and remaining time
  • Exception Safety: Clean exception handling and resource cleanup
  • Asyncio Integration: Full compatibility with modern asyncio patterns

Install with Tessl CLI

npx tessl i tessl/pypi-asgiref

docs

compatibility.md

current-thread-executor.md

index.md

local-storage.md

server-base.md

sync-async.md

testing.md

timeout.md

type-definitions.md

wsgi-integration.md

tile.json