CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asgiref

ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

local-storage.mddocs/

Thread-Safe Storage

Async-aware local storage that works correctly with both threading and asyncio, replacing threading.local for async applications. This provides context isolation that respects both thread boundaries and async task boundaries.

Capabilities

Local Storage Class

Async-safe replacement for threading.local that works correctly with asyncio tasks and context variables.

class Local:
    """Thread and async-task safe local storage."""
    
    def __init__(self, thread_critical=False):
        """
        Initialize local storage.
        
        Parameters:
        - thread_critical: bool, whether to use thread-based storage (default False)
                          If False, uses context variables for async safety
                          If True, uses traditional thread-local storage
        """
    
    def __getattr__(self, key):
        """
        Get attribute from local storage.
        
        Parameters:
        - key: str, attribute name
        
        Returns:
        Any: Value stored for this context/thread
        
        Raises:
        AttributeError: If attribute not found in current context
        """
    
    def __setattr__(self, key, value):
        """
        Set attribute in local storage.
        
        Parameters:
        - key: str, attribute name  
        - value: Any, value to store for this context/thread
        """
    
    def __delattr__(self, key):
        """
        Delete attribute from local storage.
        
        Parameters:
        - key: str, attribute name to remove
        
        Raises:
        AttributeError: If attribute not found in current context
        """

Usage Examples

Basic Local Storage

from asgiref.local import Local
import asyncio

# Create local storage instance
local = Local()

async def worker_task(worker_id):
    # Each task gets its own storage context
    local.worker_id = worker_id
    local.data = []
    
    for i in range(3):
        local.data.append(f"item_{i}")
        await asyncio.sleep(0.1)
    
    print(f"Worker {local.worker_id}: {local.data}")

async def main():
    # Run multiple tasks concurrently
    tasks = [worker_task(i) for i in range(3)]
    await asyncio.gather(*tasks)

# Each worker will have isolated storage
# asyncio.run(main())

Request Context in Web Applications

from asgiref.local import Local

# Global request context
request_context = Local()

async def asgi_app(scope, receive, send):
    # Store request information in local context
    request_context.scope = scope
    request_context.user_id = scope.get('user', {}).get('id')
    request_context.request_id = scope.get('headers', {}).get('x-request-id')
    
    try:
        await handle_request(scope, receive, send)
    finally:
        # Context automatically cleaned up when task ends
        pass

async def handle_request(scope, receive, send):
    # Can access request context from anywhere in the call stack
    print(f"Handling request {request_context.request_id} for user {request_context.user_id}")
    
    await business_logic()
    
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [[b'content-type', b'text/plain']],
    })
    await send({
        'type': 'http.response.body',
        'body': f'Request {request_context.request_id} processed'.encode(),
    })

async def business_logic():
    # Deep in the call stack, still has access to request context
    if request_context.user_id:
        print(f"Processing for authenticated user: {request_context.user_id}")
    else:
        print("Processing for anonymous user")

Database Connection Management

from asgiref.local import Local
import asyncio

class DatabaseManager:
    def __init__(self):
        self.local = Local()
    
    async def get_connection(self):
        """Get or create connection for current context."""
        if not hasattr(self.local, 'connection'):
            # Create new connection for this context
            self.local.connection = await self.create_connection()
            print(f"Created new connection: {id(self.local.connection)}")
        return self.local.connection
    
    async def create_connection(self):
        """Simulate creating a database connection."""
        await asyncio.sleep(0.1)  # Simulate connection time
        return {"id": id({}), "status": "connected"}
    
    async def query(self, sql):
        """Execute query using context-local connection."""
        conn = await self.get_connection()
        print(f"Executing '{sql}' on connection {conn['id']}")
        return {"result": "data", "connection_id": conn["id"]}

db = DatabaseManager()

async def service_function(service_id):
    """Service function that uses database."""
    result1 = await db.query("SELECT * FROM users")
    await asyncio.sleep(0.1)
    result2 = await db.query("SELECT * FROM orders")
    
    print(f"Service {service_id} results: {result1['connection_id']}, {result2['connection_id']}")

async def main():
    # Each service call gets its own database connection
    await asyncio.gather(
        service_function("A"),
        service_function("B"),
        service_function("C")
    )

# Each service will use its own connection consistently
# asyncio.run(main())

Thread-Critical Mode

from asgiref.local import Local
import threading
import asyncio

# Thread-critical local storage
thread_local = Local(thread_critical=True)

# Regular async-safe local storage  
async_local = Local(thread_critical=False)

def thread_worker(thread_id):
    """Function running in separate thread."""
    thread_local.thread_id = thread_id
    async_local.thread_id = thread_id  # This won't work across threads
    
    print(f"Thread {thread_id} - thread_local: {thread_local.thread_id}")
    
    try:
        print(f"Thread {thread_id} - async_local: {async_local.thread_id}")
    except AttributeError:
        print(f"Thread {thread_id} - async_local: Not accessible from different thread")

async def async_worker(worker_id):
    """Async function running in same thread."""
    async_local.worker_id = worker_id
    print(f"Async worker {worker_id} - async_local: {async_local.worker_id}")

def demonstrate_differences():
    # Set up initial values in main thread
    thread_local.main_value = "main_thread"
    async_local.main_value = "main_async"
    
    # Start thread workers
    threads = []
    for i in range(2):
        t = threading.Thread(target=thread_worker, args=(i,))
        threads.append(t)
        t.start()
    
    # Wait for threads
    for t in threads:
        t.join()
    
    # Run async workers in same thread
    async def run_async_workers():
        await asyncio.gather(
            async_worker("A"),
            async_worker("B")
        )
    
    asyncio.run(run_async_workers())

# demonstrate_differences()

Middleware with Local Context

from asgiref.local import Local
import time
import uuid

# Global context for request tracking
request_local = Local()

class RequestTrackingMiddleware:
    """Middleware that tracks request context."""
    
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        # Initialize request context
        request_local.request_id = str(uuid.uuid4())
        request_local.start_time = time.time()
        request_local.path = scope['path']
        
        print(f"Request {request_local.request_id} started: {request_local.path}")
        
        try:
            await self.app(scope, receive, send)
        finally:
            duration = time.time() - request_local.start_time
            print(f"Request {request_local.request_id} completed in {duration:.3f}s")

async def business_app(scope, receive, send):
    """Business application that uses request context."""
    # Can access request context from anywhere
    print(f"Processing request {request_local.request_id} for path {request_local.path}")
    
    await asyncio.sleep(0.1)  # Simulate work
    
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [[b'content-type', b'text/plain']],
    })
    await send({
        'type': 'http.response.body',
        'body': f'Processed by request {request_local.request_id}'.encode(),
    })

# Wrap application with middleware
app = RequestTrackingMiddleware(business_app)

Context Variable vs Thread Local

The Local class automatically chooses the appropriate storage mechanism:

  • Context Variables (default): Works with asyncio tasks, coroutines, and async generators
  • Thread Local (thread_critical=True): Works with traditional threading

Context variables are preferred for async applications as they properly isolate data across concurrent tasks running in the same thread.

Install with Tessl CLI

npx tessl i tessl/pypi-asgiref

docs

compatibility.md

current-thread-executor.md

index.md

local-storage.md

server-base.md

sync-async.md

testing.md

timeout.md

type-definitions.md

wsgi-integration.md

tile.json