A modern Python web server and web framework designed for high performance and speed using async/await syntax.
—
Sanic provides a flexible middleware system for request/response processing and an event-driven signal system for application lifecycle management. These systems enable cross-cutting concerns, custom processing pipelines, and reactive programming patterns.
Middleware functions that process requests and responses in a configurable pipeline.
def middleware(middleware_or_request: str):
"""
Decorator for registering middleware functions.
Parameters:
- middleware_or_request: Middleware type ('request' or 'response')
Usage:
@app.middleware('request')
async def add_session(request):
# Process request before route handler
request.ctx.session = await get_session(request)
@app.middleware('response')
async def add_cors_headers(request, response):
# Process response after route handler
response.headers['Access-Control-Allow-Origin'] = '*'
"""
# Middleware registration methods
def request_middleware(self, middleware_fn):
"""
Register request middleware function.
Parameters:
- middleware_fn: Middleware function to register
"""
def response_middleware(self, middleware_fn):
"""
Register response middleware function.
Parameters:
- middleware_fn: Middleware function to register
"""
def add_middleware(
self,
middleware_fn,
attach_to: str = "request"
):
"""
Add middleware programmatically.
Parameters:
- middleware_fn: Middleware function
- attach_to: Middleware type ('request' or 'response')
"""Process incoming requests before they reach route handlers.
async def request_middleware_function(request):
"""
Request middleware function signature.
Parameters:
- request: Request object
The middleware can:
- Modify the request object
- Add data to request.ctx
- Return early response to short-circuit processing
- Raise exceptions for error handling
Returns:
- None: Continue processing
- HTTPResponse: Short-circuit with response
"""Process responses after route handlers complete.
async def response_middleware_function(request, response):
"""
Response middleware function signature.
Parameters:
- request: Original request object
- response: Response object from handler
The middleware can:
- Modify response headers
- Transform response body
- Add tracking/logging information
- Handle cleanup operations
Returns:
- None: Use provided response
- HTTPResponse: Replace with new response
"""Event-driven programming system for application lifecycle and custom events.
class Signal:
"""Signal class for event management."""
def __init__(
self,
event: str = None,
conditions: dict = None,
exclusive: bool = True
):
"""
Initialize signal.
Parameters:
- event: Signal event name
- conditions: Signal conditions
- exclusive: Whether signal is exclusive
"""
async def send(
self,
*args,
**kwargs
):
"""
Send signal asynchronously.
Parameters:
- *args: Signal arguments
- **kwargs: Signal keyword arguments
"""
def send_sync(
self,
*args,
**kwargs
):
"""
Send signal synchronously.
Parameters:
- *args: Signal arguments
- **kwargs: Signal keyword arguments
"""
def signal(event: str, **kwargs):
"""
Decorator for signal handlers.
Parameters:
- event: Signal event name
- **kwargs: Signal conditions
Usage:
@app.signal("http.lifecycle.request")
async def handle_request_signal(request):
# Handle request lifecycle signal
pass
"""Pre-defined signals for common application lifecycle events.
# Server lifecycle signals
"server.init.before" # Before server initialization
"server.init.after" # After server initialization
"server.shutdown.before" # Before server shutdown
"server.shutdown.after" # After server shutdown
# HTTP lifecycle signals
"http.lifecycle.begin" # HTTP request begins
"http.lifecycle.complete" # HTTP request complete
"http.lifecycle.exception" # HTTP request exception
"http.lifecycle.handle" # HTTP request handling
"http.lifecycle.read_body" # HTTP request body read
"http.lifecycle.read_head" # HTTP request head read
"http.lifecycle.request" # HTTP request object created
"http.lifecycle.response" # HTTP response object created
"http.lifecycle.send" # HTTP response send
# Middleware signals
"http.middleware.before" # Before middleware execution
"http.middleware.after" # After middleware execution
# Routing signals
"http.routing.before" # Before routing
"http.routing.after" # After routing
# WebSocket signals
"websocket.before" # Before WebSocket connection
"websocket.after" # After WebSocket connectionRegister signal handlers for built-in and custom signals.
def add_signal(
self,
handler,
event: str,
**conditions
):
"""
Add signal handler programmatically.
Parameters:
- handler: Signal handler function
- event: Signal event name
- **conditions: Signal conditions
"""
def signal_handler(event: str, **conditions):
"""
Signal handler decorator.
Parameters:
- event: Signal event name
- **conditions: Signal conditions
"""Create and dispatch custom application signals.
def dispatch(
event: str,
*,
context: dict = None,
condition: dict = None,
fail_not_found: bool = True,
inline: bool = False,
reverse: bool = False
):
"""
Dispatch custom signal.
Parameters:
- event: Signal event name
- context: Signal context data
- condition: Signal conditions
- fail_not_found: Fail if no handlers found
- inline: Execute handlers inline
- reverse: Execute handlers in reverse order
"""from sanic import Sanic
from sanic.response import json
import time
app = Sanic("MyApp")
@app.middleware('request')
async def add_start_time(request):
\"\"\"Add request start time for performance tracking.\"\"\"
request.ctx.start_time = time.time()
@app.middleware('response')
async def add_process_time(request, response):
\"\"\"Add processing time header to response.\"\"\"
if hasattr(request.ctx, 'start_time'):
process_time = time.time() - request.ctx.start_time
response.headers['X-Process-Time'] = str(process_time)
@app.route("/api/data")
async def get_data(request):
# Simulate processing
await asyncio.sleep(0.1)
return json({"data": "example"})from sanic.response import json
from sanic.exceptions import Unauthorized
@app.middleware('request')
async def authenticate_request(request):
\"\"\"Authenticate requests to protected endpoints.\"\"\"
# Skip authentication for public endpoints
public_paths = ['/login', '/register', '/health']
if request.path in public_paths:
return
# Check for authorization header
auth_header = request.headers.get('Authorization')
if not auth_header:
raise Unauthorized("Authorization header required")
# Validate token
try:
token = auth_header.replace('Bearer ', '')
user = await validate_token(token)
request.ctx.user = user
except Exception:
raise Unauthorized("Invalid token")
@app.route("/api/profile")
async def get_profile(request):
\"\"\"Protected endpoint requiring authentication.\"\"\"
return json({"user": request.ctx.user})@app.middleware('response')
async def add_cors_headers(request, response):
\"\"\"Add CORS headers to all responses.\"\"\"
response.headers.update({
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
'Access-Control-Allow-Headers': 'Origin, Accept, Content-Type, X-Requested-With, Authorization',
'Access-Control-Max-Age': '86400'
})
@app.middleware('request')
async def handle_options_request(request):
\"\"\"Handle preflight OPTIONS requests.\"\"\"
if request.method == 'OPTIONS':
return json({}, status=200)import logging
import uuid
@app.middleware('request')
async def add_request_id(request):
\"\"\"Add unique request ID for tracing.\"\"\"
request.ctx.request_id = str(uuid.uuid4())
request.ctx.logger = logging.getLogger('app').bind(request_id=request.ctx.request_id)
@app.middleware('request')
async def log_request(request):
\"\"\"Log incoming requests.\"\"\"
request.ctx.logger.info(
"Request started",
method=request.method,
path=request.path,
remote_addr=request.ip
)
@app.middleware('response')
async def log_response(request, response):
\"\"\"Log outgoing responses.\"\"\"
request.ctx.logger.info(
"Request completed",
status_code=response.status,
content_length=len(response.body) if response.body else 0
)@app.signal("server.init.before")
async def setup_database(app, **context):
\"\"\"Initialize database connection before server starts.\"\"\"
app.ctx.db = await create_database_connection()
print("Database connection established")
@app.signal("server.shutdown.before")
async def cleanup_database(app, **context):
\"\"\"Close database connection before server shuts down.\"\"\"
if hasattr(app.ctx, 'db'):
await app.ctx.db.close()
print("Database connection closed")
@app.signal("http.lifecycle.request")
async def track_request(request, **context):
\"\"\"Track request metrics.\"\"\"
await increment_request_counter(
method=request.method,
path=request.path
)
@app.signal("http.lifecycle.exception")
async def handle_request_exception(request, exception, **context):
\"\"\"Handle request exceptions.\"\"\"
await log_exception(
request_id=getattr(request.ctx, 'request_id', 'unknown'),
exception=exception,
path=request.path
)# Define custom signal events
CUSTOM_EVENTS = {
"user.created": "user.created",
"user.updated": "user.updated",
"user.deleted": "user.deleted",
"order.placed": "order.placed",
"order.fulfilled": "order.fulfilled"
}
@app.signal(CUSTOM_EVENTS["user.created"])
async def send_welcome_email(user, **context):
\"\"\"Send welcome email when user is created.\"\"\"
await send_email(
to=user['email'],
template='welcome',
context={'user': user}
)
@app.signal(CUSTOM_EVENTS["order.placed"])
async def process_order(order, **context):
\"\"\"Process order when placed.\"\"\"
await update_inventory(order['items'])
await notify_fulfillment_center(order)
# Dispatch custom signals from route handlers
@app.route("/api/users", methods=["POST"])
async def create_user(request):
user_data = request.json
user = await create_user_in_db(user_data)
# Dispatch custom signal
await app.dispatch(
CUSTOM_EVENTS["user.created"],
context={"user": user}
)
return json({"user": user}, status=201)@app.middleware('request')
async def rate_limit_middleware(request):
\"\"\"Apply rate limiting to API endpoints.\"\"\"
# Only apply to API routes
if not request.path.startswith('/api/'):
return
# Get client identifier
client_id = request.ip
if 'X-API-Key' in request.headers:
client_id = request.headers['X-API-Key']
# Check rate limit
if await is_rate_limited(client_id):
return json(
{"error": "Rate limit exceeded"},
status=429,
headers={"Retry-After": "60"}
)
# Track request
await track_request(client_id)
@app.middleware('response')
async def cache_control_middleware(request, response):
\"\"\"Add cache control headers based on route.\"\"\"
# Different caching strategies for different endpoints
if request.path.startswith('/api/static/'):
# Long cache for static content
response.headers['Cache-Control'] = 'public, max-age=86400'
elif request.path.startswith('/api/'):
# No cache for API responses
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
else:
# Default cache policy
response.headers['Cache-Control'] = 'public, max-age=3600'@app.signal("http.lifecycle.request", priority=1)
async def high_priority_request_handler(request, **context):
\"\"\"High priority request handler.\"\"\"
# This runs first due to higher priority
request.ctx.processed_by_high_priority = True
@app.signal("http.lifecycle.request", priority=0)
async def normal_priority_request_handler(request, **context):
\"\"\"Normal priority request handler.\"\"\"
# This runs after high priority handlers
if hasattr(request.ctx, 'processed_by_high_priority'):
request.ctx.processing_complete = True
# Conditional signal handlers
@app.signal("http.lifecycle.response", condition={"status_code": 404})
async def handle_404_responses(request, response, **context):
\"\"\"Handle 404 responses specifically.\"\"\"
await log_404(request.path, request.ip)
@app.signal("http.lifecycle.response", condition={"method": "POST"})
async def handle_post_responses(request, response, **context):
\"\"\"Handle POST method responses.\"\"\"
await audit_post_request(request, response)Install with Tessl CLI
npx tessl i tessl/pypi-sanic