ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Base classes for implementing ASGI servers, handling application lifecycle, connection management, and stateless protocol support. This provides the foundation for building custom ASGI server implementations.
Abstract base class for implementing stateless ASGI servers with automatic application instance management and lifecycle handling.
class StatelessServer:
"""Base server class for stateless protocols."""
def __init__(self, application, max_applications=1000):
"""
Initialize server with ASGI application.
Parameters:
- application: callable, ASGI application to serve
- max_applications: int, maximum concurrent application instances (default 1000)
"""
def run(self):
"""
Run the server with asyncio event loop.
Creates new event loop if none exists and runs the server until interrupted.
"""
async def arun(self):
"""
Async version of run method.
Returns:
Coroutine that runs the server asynchronously
"""
async def handle(self):
"""
Abstract method to override for handling connections.
Raises:
NotImplementedError: Must be implemented by subclasses
"""
async def application_send(self, scope, message):
"""
Abstract method to override for sending messages to clients.
Parameters:
- scope: dict, ASGI scope for the connection
- message: dict, ASGI message to send
Raises:
NotImplementedError: Must be implemented by subclasses
"""
async def get_or_create_application_instance(self, scope_id, scope):
"""
Create or retrieve application instance for scope.
Parameters:
- scope_id: str, unique identifier for the scope
- scope: dict, ASGI scope information
Returns:
dict: Application instance information
"""
async def delete_oldest_application_instance(self):
"""
Remove the oldest application instance to free memory.
Called automatically when max_applications limit is reached.
"""
async def delete_application_instance(self, scope_id):
"""
Remove specific application instance.
Parameters:
- scope_id: str, identifier of instance to remove
"""
async def application_checker(self):
"""
Background task for monitoring application instances.
Periodically checks for expired or orphaned instances and cleans them up.
"""
async def application_exception(self, exception, application_details):
"""
Handle exceptions from application instances.
Parameters:
- exception: Exception, the exception that occurred
- application_details: dict, details about the application instance
"""
application_checker_interval: float = 0.1 # Monitoring interval in secondsfrom asgiref.server import StatelessServer
import asyncio
import socket
class SimpleHTTPServer(StatelessServer):
"""Simple HTTP server implementation."""
def __init__(self, application, host='127.0.0.1', port=8000, max_applications=1000):
super().__init__(application, max_applications)
self.host = host
self.port = port
self.server = None
async def handle(self):
"""Start HTTP server and handle connections."""
self.server = await asyncio.start_server(
self.handle_connection,
self.host,
self.port
)
print(f"Server running on {self.host}:{self.port}")
await self.server.serve_forever()
async def handle_connection(self, reader, writer):
"""Handle individual HTTP connection."""
try:
# Read HTTP request (simplified)
request_line = await reader.readline()
if not request_line:
return
# Parse request
method, path, version = request_line.decode().strip().split()
# Read headers (simplified)
headers = []
while True:
line = await reader.readline()
if not line or line == b'\\r\\n':
break
header_line = line.decode().strip()
if ':' in header_line:
name, value = header_line.split(':', 1)
headers.append([name.strip().lower().encode(), value.strip().encode()])
# Create ASGI scope
scope = {
'type': 'http',
'method': method,
'path': path,
'query_string': b'',
'headers': headers,
'server': (self.host, self.port),
}
# Create unique scope ID
scope_id = f"{writer.get_extra_info('peername')}_{id(writer)}"
# Get application instance
app_instance = await self.get_or_create_application_instance(scope_id, scope)
# Set up ASGI receive/send
self.writer = writer # Store for application_send method
async def receive():
return {'type': 'http.request', 'body': b''}
async def send(message):
await self.application_send(scope, message)
# Run application
try:
await app_instance['application'](scope, receive, send)
except Exception as e:
await self.application_exception(e, app_instance)
finally:
await self.delete_application_instance(scope_id)
writer.close()
except Exception as e:
print(f"Connection error: {e}")
writer.close()
async def application_send(self, scope, message):
"""Send ASGI message as HTTP response."""
if message['type'] == 'http.response.start':
status = message['status']
self.writer.write(f'HTTP/1.1 {status} OK\\r\\n'.encode())
for name, value in message.get('headers', []):
self.writer.write(f'{name.decode()}: {value.decode()}\\r\\n'.encode())
self.writer.write(b'\\r\\n')
elif message['type'] == 'http.response.body':
body = message.get('body', b'')
self.writer.write(body)
if not message.get('more_body', False):
await self.writer.drain()
# Usage
async def simple_app(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Hello from custom ASGI server!',
})
server = SimpleHTTPServer(simple_app, port=8000)
# server.run()from asgiref.server import StatelessServer
import asyncio
import websockets
import json
class SimpleWebSocketServer(StatelessServer):
"""Simple WebSocket server implementation."""
def __init__(self, application, host='127.0.0.1', port=8001, max_applications=1000):
super().__init__(application, max_applications)
self.host = host
self.port = port
async def handle(self):
"""Start WebSocket server."""
print(f"WebSocket server running on {self.host}:{self.port}")
await websockets.serve(self.handle_websocket, self.host, self.port)
async def handle_websocket(self, websocket, path):
"""Handle WebSocket connection."""
scope = {
'type': 'websocket',
'path': path,
'query_string': b'',
'headers': [],
'server': (self.host, self.port),
}
scope_id = f"{websocket.remote_address}_{id(websocket)}"
try:
app_instance = await self.get_or_create_application_instance(scope_id, scope)
# Store websocket for application_send method
self.websocket = websocket
# Message queues for ASGI communication
receive_queue = asyncio.Queue()
async def receive():
return await receive_queue.get()
async def send(message):
await self.application_send(scope, message)
# Start application
app_task = asyncio.create_task(
app_instance['application'](scope, receive, send)
)
# Send connect event
await receive_queue.put({'type': 'websocket.connect'})
try:
# Handle incoming messages
async for message in websocket:
if isinstance(message, str):
await receive_queue.put({
'type': 'websocket.receive',
'text': message
})
else:
await receive_queue.put({
'type': 'websocket.receive',
'bytes': message
})
except websockets.exceptions.ConnectionClosed:
await receive_queue.put({'type': 'websocket.disconnect', 'code': 1000})
await app_task
except Exception as e:
await self.application_exception(e, {'scope_id': scope_id})
finally:
await self.delete_application_instance(scope_id)
async def application_send(self, scope, message):
"""Send ASGI message as WebSocket message."""
if message['type'] == 'websocket.accept':
# WebSocket already accepted by websockets library
pass
elif message['type'] == 'websocket.send':
if 'text' in message:
await self.websocket.send(message['text'])
elif 'bytes' in message:
await self.websocket.send(message['bytes'])
elif message['type'] == 'websocket.close':
code = message.get('code', 1000)
await self.websocket.close(code)
# Usage
async def echo_websocket_app(scope, receive, send):
"""Echo WebSocket application."""
await send({'type': 'websocket.accept'})
while True:
message = await receive()
if message['type'] == 'websocket.disconnect':
break
elif message['type'] == 'websocket.receive':
if 'text' in message:
echo_text = f"Echo: {message['text']}"
await send({
'type': 'websocket.send',
'text': echo_text
})
ws_server = SimpleWebSocketServer(echo_websocket_app)
# await ws_server.arun()from asgiref.server import StatelessServer
import asyncio
import random
class LoadBalancingServer(StatelessServer):
"""Server that load balances between multiple application instances."""
def __init__(self, applications, max_applications=1000):
# Use a simple round-robin selector as the main application
self.applications = applications
self.current_app_index = 0
# Create a dispatcher application
super().__init__(self.dispatcher_app, max_applications)
async def dispatcher_app(self, scope, receive, send):
"""Dispatcher that selects application based on load balancing."""
# Simple round-robin selection
app = self.applications[self.current_app_index]
self.current_app_index = (self.current_app_index + 1) % len(self.applications)
await app(scope, receive, send)
async def handle(self):
"""Custom handle method for load balancing."""
print(f"Load balancing server with {len(self.applications)} applications")
# In a real implementation, this would start actual network handling
# For demonstration, we'll just run the application checker
await self.application_checker()
async def application_send(self, scope, message):
"""Handle sending messages (implementation depends on transport)."""
print(f"Sending message: {message['type']}")
async def application_exception(self, exception, application_details):
"""Enhanced exception handling with load balancer context."""
print(f"Application exception in load balancer: {exception}")
print(f"Application details: {application_details}")
# Could implement circuit breaker logic here
# Remove failing application temporarily, etc.
# Usage with multiple applications
async def app1(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Response from App 1',
})
async def app2(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Response from App 2',
})
load_balancer = LoadBalancingServer([app1, app2])from asgiref.server import StatelessServer
import asyncio
import time
class ManagedLifecycleServer(StatelessServer):
"""Server with custom application lifecycle management."""
def __init__(self, application, max_applications=500):
super().__init__(application, max_applications)
self.instance_stats = {}
async def get_or_create_application_instance(self, scope_id, scope):
"""Enhanced instance creation with statistics tracking."""
instance = await super().get_or_create_application_instance(scope_id, scope)
# Track instance statistics
self.instance_stats[scope_id] = {
'created_at': time.time(),
'request_count': 0,
'last_activity': time.time(),
}
return instance
async def delete_application_instance(self, scope_id):
"""Enhanced instance deletion with cleanup."""
await super().delete_application_instance(scope_id)
# Clean up statistics
if scope_id in self.instance_stats:
stats = self.instance_stats.pop(scope_id)
lifetime = time.time() - stats['created_at']
print(f"Instance {scope_id} lived {lifetime:.2f}s, handled {stats['request_count']} requests")
async def application_send(self, scope, message):
"""Track message sending statistics."""
# Update activity tracking
scope_id = getattr(self, '_current_scope_id', None)
if scope_id and scope_id in self.instance_stats:
self.instance_stats[scope_id]['last_activity'] = time.time()
self.instance_stats[scope_id]['request_count'] += 1
print(f"Sending {message['type']} for scope {scope_id}")
async def application_checker(self):
"""Enhanced application checker with custom logic."""
while True:
await asyncio.sleep(self.application_checker_interval)
current_time = time.time()
expired_instances = []
for scope_id, stats in self.instance_stats.items():
# Mark instances idle for more than 30 seconds as expired
if current_time - stats['last_activity'] > 30:
expired_instances.append(scope_id)
# Clean up expired instances
for scope_id in expired_instances:
await self.delete_application_instance(scope_id)
print(f"Active instances: {len(self.instance_stats)}")
async def handle(self):
"""Start the application checker."""
await self.application_checker()
# Usage
managed_server = ManagedLifecycleServer(simple_app)The StatelessServer base class provides:
Install with Tessl CLI
npx tessl i tessl/pypi-asgiref