ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Async-aware local storage that works correctly with both threading and asyncio, replacing threading.local for async applications. This provides context isolation that respects both thread boundaries and async task boundaries.
Async-safe replacement for threading.local that works correctly with asyncio tasks and context variables.
class Local:
"""Thread and async-task safe local storage."""
def __init__(self, thread_critical=False):
"""
Initialize local storage.
Parameters:
- thread_critical: bool, whether to use thread-based storage (default False)
If False, uses context variables for async safety
If True, uses traditional thread-local storage
"""
def __getattr__(self, key):
"""
Get attribute from local storage.
Parameters:
- key: str, attribute name
Returns:
Any: Value stored for this context/thread
Raises:
AttributeError: If attribute not found in current context
"""
def __setattr__(self, key, value):
"""
Set attribute in local storage.
Parameters:
- key: str, attribute name
- value: Any, value to store for this context/thread
"""
def __delattr__(self, key):
"""
Delete attribute from local storage.
Parameters:
- key: str, attribute name to remove
Raises:
AttributeError: If attribute not found in current context
"""from asgiref.local import Local
import asyncio
# Create local storage instance
local = Local()
async def worker_task(worker_id):
# Each task gets its own storage context
local.worker_id = worker_id
local.data = []
for i in range(3):
local.data.append(f"item_{i}")
await asyncio.sleep(0.1)
print(f"Worker {local.worker_id}: {local.data}")
async def main():
# Run multiple tasks concurrently
tasks = [worker_task(i) for i in range(3)]
await asyncio.gather(*tasks)
# Each worker will have isolated storage
# asyncio.run(main())from asgiref.local import Local
# Global request context
request_context = Local()
async def asgi_app(scope, receive, send):
# Store request information in local context
request_context.scope = scope
request_context.user_id = scope.get('user', {}).get('id')
request_context.request_id = scope.get('headers', {}).get('x-request-id')
try:
await handle_request(scope, receive, send)
finally:
# Context automatically cleaned up when task ends
pass
async def handle_request(scope, receive, send):
# Can access request context from anywhere in the call stack
print(f"Handling request {request_context.request_id} for user {request_context.user_id}")
await business_logic()
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': f'Request {request_context.request_id} processed'.encode(),
})
async def business_logic():
# Deep in the call stack, still has access to request context
if request_context.user_id:
print(f"Processing for authenticated user: {request_context.user_id}")
else:
print("Processing for anonymous user")from asgiref.local import Local
import asyncio
class DatabaseManager:
def __init__(self):
self.local = Local()
async def get_connection(self):
"""Get or create connection for current context."""
if not hasattr(self.local, 'connection'):
# Create new connection for this context
self.local.connection = await self.create_connection()
print(f"Created new connection: {id(self.local.connection)}")
return self.local.connection
async def create_connection(self):
"""Simulate creating a database connection."""
await asyncio.sleep(0.1) # Simulate connection time
return {"id": id({}), "status": "connected"}
async def query(self, sql):
"""Execute query using context-local connection."""
conn = await self.get_connection()
print(f"Executing '{sql}' on connection {conn['id']}")
return {"result": "data", "connection_id": conn["id"]}
db = DatabaseManager()
async def service_function(service_id):
"""Service function that uses database."""
result1 = await db.query("SELECT * FROM users")
await asyncio.sleep(0.1)
result2 = await db.query("SELECT * FROM orders")
print(f"Service {service_id} results: {result1['connection_id']}, {result2['connection_id']}")
async def main():
# Each service call gets its own database connection
await asyncio.gather(
service_function("A"),
service_function("B"),
service_function("C")
)
# Each service will use its own connection consistently
# asyncio.run(main())from asgiref.local import Local
import threading
import asyncio
# Thread-critical local storage
thread_local = Local(thread_critical=True)
# Regular async-safe local storage
async_local = Local(thread_critical=False)
def thread_worker(thread_id):
"""Function running in separate thread."""
thread_local.thread_id = thread_id
async_local.thread_id = thread_id # This won't work across threads
print(f"Thread {thread_id} - thread_local: {thread_local.thread_id}")
try:
print(f"Thread {thread_id} - async_local: {async_local.thread_id}")
except AttributeError:
print(f"Thread {thread_id} - async_local: Not accessible from different thread")
async def async_worker(worker_id):
"""Async function running in same thread."""
async_local.worker_id = worker_id
print(f"Async worker {worker_id} - async_local: {async_local.worker_id}")
def demonstrate_differences():
# Set up initial values in main thread
thread_local.main_value = "main_thread"
async_local.main_value = "main_async"
# Start thread workers
threads = []
for i in range(2):
t = threading.Thread(target=thread_worker, args=(i,))
threads.append(t)
t.start()
# Wait for threads
for t in threads:
t.join()
# Run async workers in same thread
async def run_async_workers():
await asyncio.gather(
async_worker("A"),
async_worker("B")
)
asyncio.run(run_async_workers())
# demonstrate_differences()from asgiref.local import Local
import time
import uuid
# Global context for request tracking
request_local = Local()
class RequestTrackingMiddleware:
"""Middleware that tracks request context."""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# Initialize request context
request_local.request_id = str(uuid.uuid4())
request_local.start_time = time.time()
request_local.path = scope['path']
print(f"Request {request_local.request_id} started: {request_local.path}")
try:
await self.app(scope, receive, send)
finally:
duration = time.time() - request_local.start_time
print(f"Request {request_local.request_id} completed in {duration:.3f}s")
async def business_app(scope, receive, send):
"""Business application that uses request context."""
# Can access request context from anywhere
print(f"Processing request {request_local.request_id} for path {request_local.path}")
await asyncio.sleep(0.1) # Simulate work
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': f'Processed by request {request_local.request_id}'.encode(),
})
# Wrap application with middleware
app = RequestTrackingMiddleware(business_app)The Local class automatically chooses the appropriate storage mechanism:
Context variables are preferred for async applications as they properly isolate data across concurrent tasks running in the same thread.
Install with Tessl CLI
npx tessl i tessl/pypi-asgiref