A Python ASGI web framework with the same API as Flask
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Application, request, and WebSocket context handling with global proxy objects, context copying utilities, and lifecycle management for maintaining state across async operations.
Functions to check current context availability and state.
def has_app_context() -> bool:
"""
Check if currently inside an application context.
Returns:
True if in application context, False otherwise
"""
def has_request_context() -> bool:
"""
Check if currently inside a request context.
Returns:
True if in request context, False otherwise
"""
def has_websocket_context() -> bool:
"""
Check if currently inside a WebSocket context.
Returns:
True if in WebSocket context, False otherwise
"""Decorators and utilities for preserving context across async operations and background tasks.
def copy_current_app_context(func: Callable):
"""
Copy current application context to decorated function.
Args:
func: Function to decorate with app context
Returns:
Decorated function that runs with copied app context
Usage:
@copy_current_app_context
async def background_task():
# Has access to current_app
pass
"""
def copy_current_request_context(func: Callable):
"""
Copy current request context to decorated function.
Args:
func: Function to decorate with request context
Returns:
Decorated function that runs with copied request context
Usage:
@copy_current_request_context
async def process_request_data():
# Has access to request, session, g
pass
"""
def copy_current_websocket_context(func: Callable):
"""
Copy current WebSocket context to decorated function.
Args:
func: Function to decorate with WebSocket context
Returns:
Decorated function that runs with copied WebSocket context
Usage:
@copy_current_websocket_context
async def handle_websocket_data():
# Has access to websocket
pass
"""Functions to schedule operations after current request or WebSocket connection completes.
def after_this_request(func: Callable):
"""
Schedule function to run after current request completes.
Args:
func: Function to run after request (receives response as argument)
Usage:
@after_this_request
def log_request(response):
# Log request details
pass
"""
def after_this_websocket(func: Callable):
"""
Schedule function to run after current WebSocket connection ends.
Args:
func: Function to run after WebSocket closes
Usage:
@after_this_websocket
def cleanup_websocket():
# Clean up WebSocket resources
pass
"""Context-aware proxy objects that provide access to current application, request, and WebSocket state.
# Application Context Globals
current_app: Quart
"""
Proxy to current application instance.
Available in: app context, request context, WebSocket context
"""
g: object
"""
Application context global object for storing data.
Available in: app context, request context, WebSocket context
Usage:
g.user_id = 123
g.start_time = time.time()
"""
# Request Context Globals
request: Request
"""
Proxy to current request object.
Available in: request context only
"""
session: dict
"""
Proxy to current session object.
Available in: request context only
Usage:
session['user_id'] = 123
user_id = session.get('user_id')
"""
# WebSocket Context Globals
websocket: Websocket
"""
Proxy to current WebSocket object.
Available in: WebSocket context only
"""from quart import Quart, has_app_context, has_request_context, has_websocket_context
from quart import current_app, request, websocket, g
app = Quart(__name__)
async def utility_function():
"""Utility function that works in different contexts."""
if has_app_context():
app_name = current_app.name
g.utility_called = True
else:
app_name = "No app context"
if has_request_context():
method = request.method
path = request.path
else:
method = path = "No request context"
if has_websocket_context():
ws_path = websocket.path
else:
ws_path = "No WebSocket context"
return {
'app_name': app_name,
'method': method,
'path': path,
'websocket_path': ws_path
}
@app.route('/context-info')
async def context_info():
info = await utility_function()
return info
@app.websocket('/ws/context-info')
async def websocket_context_info():
await websocket.accept()
info = await utility_function()
await websocket.send_json(info)from quart import Quart, copy_current_request_context, copy_current_app_context
from quart import request, current_app, g
import asyncio
app = Quart(__name__)
@app.route('/start-background-task', methods=['POST'])
async def start_background_task():
# Set up request-specific data
g.user_id = request.json.get('user_id')
g.task_id = generate_task_id()
# Start background task with request context
asyncio.create_task(process_user_data())
return {'task_id': g.task_id, 'status': 'started'}
@copy_current_request_context
async def process_user_data():
"""Background task with access to request context."""
try:
# Access request context data
user_id = g.user_id
task_id = g.task_id
# Perform long-running operation
result = await perform_user_processing(user_id)
# Log with app context
current_app.logger.info(f"Task {task_id} completed for user {user_id}")
# Store result
await store_task_result(task_id, result)
except Exception as e:
current_app.logger.error(f"Task {task_id} failed: {e}")
@app.route('/batch-process', methods=['POST'])
async def batch_process():
user_ids = request.json.get('user_ids', [])
# Create background tasks for each user
tasks = []
for user_id in user_ids:
# Create task with current app context
task = create_user_task(user_id)
tasks.append(asyncio.create_task(task))
# Wait for all tasks (optional)
results = await asyncio.gather(*tasks, return_exceptions=True)
return {'processed': len(user_ids), 'results': len([r for r in results if not isinstance(r, Exception)])}
@copy_current_app_context
async def create_user_task(user_id):
"""Individual user processing task with app context."""
try:
# Access app configuration
batch_size = current_app.config.get('BATCH_SIZE', 100)
# Process user data
result = await process_single_user(user_id, batch_size)
# Log using app logger
current_app.logger.info(f"Processed user {user_id}")
return result
except Exception as e:
current_app.logger.error(f"Failed to process user {user_id}: {e}")
raisefrom quart import Quart, after_this_request, request, g
import time
app = Quart(__name__)
@app.before_request
async def before_request():
g.start_time = time.time()
g.request_id = generate_request_id()
@app.route('/tracked-endpoint')
async def tracked_endpoint():
# Register after-request handler
@after_this_request
def log_request_timing(response):
duration = time.time() - g.start_time
# Log request details
current_app.logger.info(
f"Request {g.request_id}: {request.method} {request.path} "
f"-> {response.status_code} ({duration:.3f}s)"
)
# Add timing header
response.headers['X-Response-Time'] = f"{duration:.3f}s"
return response
# Process request
data = await get_some_data()
return {'data': data, 'request_id': g.request_id}
@app.route('/audit-endpoint', methods=['POST'])
async def audit_endpoint():
# Register multiple after-request handlers
@after_this_request
def audit_log(response):
# Audit logging
audit_data = {
'request_id': g.request_id,
'user_id': request.json.get('user_id'),
'action': 'data_update',
'status': response.status_code,
'timestamp': time.time()
}
asyncio.create_task(write_audit_log(audit_data))
return response
@after_this_request
def update_metrics(response):
# Update application metrics
metrics.increment('requests.audit_endpoint')
if response.status_code >= 400:
metrics.increment('requests.errors')
return response
# Process the request
result = await process_audit_request(request.json)
return resultfrom quart import Quart, copy_current_websocket_context, after_this_websocket
from quart import websocket, current_app
import asyncio
app = Quart(__name__)
# Store active WebSocket connections
active_connections = set()
@app.websocket('/ws/managed')
async def managed_websocket():
await websocket.accept()
# Add to active connections
active_connections.add(websocket)
# Register cleanup handler
@after_this_websocket
def cleanup_connection():
active_connections.discard(websocket)
current_app.logger.info(f"WebSocket connection closed, {len(active_connections)} remaining")
# Start background tasks with WebSocket context
asyncio.create_task(heartbeat_task())
asyncio.create_task(broadcast_listener())
try:
while True:
message = await websocket.receive_json()
await handle_message(message)
except ConnectionClosed:
pass
@copy_current_websocket_context
async def heartbeat_task():
"""Send periodic heartbeat with WebSocket context."""
try:
while True:
await asyncio.sleep(30)
await websocket.send_json({
'type': 'heartbeat',
'timestamp': time.time(),
'connection_count': len(active_connections)
})
except ConnectionClosed:
pass
@copy_current_websocket_context
async def broadcast_listener():
"""Listen for broadcast messages with WebSocket context."""
try:
async for message in get_broadcast_messages():
await websocket.send_json({
'type': 'broadcast',
'data': message
})
except ConnectionClosed:
pass
async def handle_message(message):
"""Handle individual WebSocket message."""
if message.get('type') == 'ping':
await websocket.send_json({'type': 'pong'})
elif message.get('type') == 'echo':
await websocket.send_json({
'type': 'echo_response',
'data': message.get('data')
})from quart import Quart, session, request, redirect, url_for
app = Quart(__name__)
app.secret_key = 'your-secret-key'
@app.route('/login', methods=['POST'])
async def login():
username = (await request.form).get('username')
password = (await request.form).get('password')
if await validate_user(username, password):
# Store user data in session
session['user_id'] = await get_user_id(username)
session['username'] = username
session['login_time'] = time.time()
session.permanent = True # Use permanent session
return redirect(url_for('dashboard'))
else:
return redirect(url_for('login_form', error='invalid'))
@app.route('/dashboard')
async def dashboard():
if 'user_id' not in session:
return redirect(url_for('login_form'))
# Access session data
user_id = session['user_id']
username = session['username']
login_time = session.get('login_time')
user_data = await get_user_data(user_id)
return await render_template('dashboard.html',
user=user_data,
username=username,
login_time=login_time)
@app.route('/logout')
async def logout():
# Clear session
session.clear()
return redirect(url_for('login_form'))
@app.before_request
async def check_session():
# Skip auth check for certain routes
if request.endpoint in ['login', 'login_form', 'static']:
return
# Check if user is logged in
if 'user_id' not in session:
return redirect(url_for('login_form'))
# Check session expiry
if session.get('login_time', 0) < time.time() - 3600: # 1 hour
session.clear()
return redirect(url_for('login_form', error='expired'))from quart import Quart, g, current_app, request
app = Quart(__name__)
@app.before_request
async def load_user():
# Store user data in g for request duration
user_id = request.headers.get('X-User-ID')
if user_id:
g.user = await load_user_data(user_id)
g.user_permissions = await load_user_permissions(user_id)
else:
g.user = None
g.user_permissions = []
@app.route('/protected-resource')
async def protected_resource():
# Use data from g
if not g.user:
return {'error': 'Authentication required'}, 401
if 'read_resource' not in g.user_permissions:
return {'error': 'Permission denied'}, 403
# Access current app configuration
max_results = current_app.config.get('MAX_RESULTS', 50)
resources = await get_user_resources(g.user.id, limit=max_results)
return {
'user': g.user.username,
'resources': resources,
'total': len(resources)
}
@app.teardown_appcontext
def cleanup_g(error):
# Clean up g object resources
if hasattr(g, 'db_connection'):
g.db_connection.close()Install with Tessl CLI
npx tessl i tessl/pypi-quart