A Python ASGI web framework with the same API as Flask
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Event-driven hooks for application lifecycle, request/response processing, WebSocket handling, error management, and template rendering with comprehensive signal support for monitoring and extending application behavior.
Signals fired during application context creation, destruction, and teardown phases.
# Application context signals
appcontext_pushed: Namespace
"""
Signal sent when application context is pushed.
Sender: The application instance
Arguments: None
Usage:
@appcontext_pushed.connect
def on_app_context_pushed(sender, **extra):
# Application context is now available
pass
"""
appcontext_popped: Namespace
"""
Signal sent when application context is popped.
Sender: The application instance
Arguments: None
Usage:
@appcontext_popped.connect
def on_app_context_popped(sender, **extra):
# Application context is being removed
pass
"""
appcontext_tearing_down: Namespace
"""
Signal sent when application context is tearing down.
Sender: The application instance
Arguments:
exc: Exception that caused teardown (None for normal teardown)
Usage:
@appcontext_tearing_down.connect
def on_app_context_teardown(sender, exc=None, **extra):
# Clean up app context resources
if exc:
# Handle teardown due to exception
pass
"""Signals for HTTP request processing lifecycle events.
# Request processing signals
request_started: Namespace
"""
Signal sent when request processing starts.
Sender: The application instance
Arguments: None
Usage:
@request_started.connect
def on_request_started(sender, **extra):
# Request processing has begun
g.start_time = time.time()
"""
request_finished: Namespace
"""
Signal sent when request processing finishes successfully.
Sender: The application instance
Arguments:
response: The response object
Usage:
@request_finished.connect
def on_request_finished(sender, response, **extra):
# Request completed successfully
duration = time.time() - g.start_time
log_request(request, response, duration)
"""
request_tearing_down: Namespace
"""
Signal sent when request context is tearing down.
Sender: The application instance
Arguments:
exc: Exception that caused teardown (None for normal teardown)
Usage:
@request_tearing_down.connect
def on_request_teardown(sender, exc=None, **extra):
# Clean up request resources
if hasattr(g, 'db_connection'):
g.db_connection.close()
"""Signals for WebSocket connection lifecycle events.
# WebSocket processing signals
websocket_started: Namespace
"""
Signal sent when WebSocket processing starts.
Sender: The application instance
Arguments: None
Usage:
@websocket_started.connect
def on_websocket_started(sender, **extra):
# WebSocket connection has begun
active_connections.add(websocket)
"""
websocket_finished: Namespace
"""
Signal sent when WebSocket processing finishes.
Sender: The application instance
Arguments: None
Usage:
@websocket_finished.connect
def on_websocket_finished(sender, **extra):
# WebSocket connection ended
active_connections.discard(websocket)
"""
websocket_tearing_down: Namespace
"""
Signal sent when WebSocket context is tearing down.
Sender: The application instance
Arguments:
exc: Exception that caused teardown (None for normal teardown)
Usage:
@websocket_tearing_down.connect
def on_websocket_teardown(sender, exc=None, **extra):
# Clean up WebSocket resources
cleanup_websocket_resources()
"""Signals fired during template rendering operations.
# Template rendering signals
before_render_template: Namespace
"""
Signal sent before template rendering begins.
Sender: The application instance
Arguments:
template: The template object
context: Template context dictionary
Usage:
@before_render_template.connect
def on_before_render(sender, template, context, **extra):
# Modify context or log template access
context['render_start_time'] = time.time()
"""
template_rendered: Namespace
"""
Signal sent after template has been rendered.
Sender: The application instance
Arguments:
template: The template object
context: Template context dictionary
Usage:
@template_rendered.connect
def on_template_rendered(sender, template, context, **extra):
# Log template rendering or clean up
duration = time.time() - context.get('render_start_time', 0)
log_template_render(template.name, duration)
"""Signals for error handling and exception management.
# Exception handling signals
got_request_exception: Namespace
"""
Signal sent when request processing encounters an exception.
Sender: The application instance
Arguments:
exception: The exception object
Usage:
@got_request_exception.connect
def on_request_exception(sender, exception, **extra):
# Log or handle request exceptions
error_tracker.report_exception(exception, request)
"""
got_websocket_exception: Namespace
"""
Signal sent when WebSocket processing encounters an exception.
Sender: The application instance
Arguments:
exception: The exception object
Usage:
@got_websocket_exception.connect
def on_websocket_exception(sender, exception, **extra):
# Log or handle WebSocket exceptions
error_tracker.report_websocket_exception(exception, websocket)
"""
got_background_exception: Namespace
"""
Signal sent when background task encounters an exception.
Sender: The application instance
Arguments:
exception: The exception object
Usage:
@got_background_exception.connect
def on_background_exception(sender, exception, **extra):
# Handle background task exceptions
background_error_handler.handle(exception)
"""
got_serving_exception: Namespace
"""
Signal sent when server encounters an exception during serving.
Sender: The application instance
Arguments:
exception: The exception object
Usage:
@got_serving_exception.connect
def on_serving_exception(sender, exception, **extra):
# Handle server-level exceptions
server_monitor.report_exception(exception)
"""Signals for flash message system events.
# Flash messaging signals
message_flashed: Namespace
"""
Signal sent when a flash message is added.
Sender: The application instance
Arguments:
message: The flash message text
category: The message category
Usage:
@message_flashed.connect
def on_message_flashed(sender, message, category, **extra):
# Log or process flash messages
audit_log.record_flash_message(message, category)
"""Constant indicating whether signal support is available.
signals_available: bool
"""
Boolean indicating whether signal support is available.
Value: True if blinker is installed, False otherwise
Usage:
if signals_available:
# Use signal-based features
setup_signal_handlers()
else:
# Fallback to non-signal approach
setup_polling_monitor()
"""from quart import Quart, g
from quart.signals import (
request_started, request_finished, request_tearing_down,
got_request_exception, appcontext_pushed, appcontext_popped
)
import time
app = Quart(__name__)
# Request timing and monitoring
@request_started.connect_via(app)
def on_request_start(sender, **extra):
g.request_start_time = time.time()
g.request_id = generate_request_id()
@request_finished.connect_via(app)
def on_request_finish(sender, response, **extra):
duration = time.time() - g.request_start_time
# Log request metrics
metrics.record_request_duration(duration)
metrics.increment(f'requests.status.{response.status_code}')
# Add timing header
response.headers['X-Response-Time'] = f'{duration:.3f}s'
response.headers['X-Request-ID'] = g.request_id
@got_request_exception.connect_via(app)
def on_request_exception(sender, exception, **extra):
# Log exception with context
error_logger.error(
f"Request {g.request_id} failed: {exception}",
extra={
'request_id': g.request_id,
'path': request.path,
'method': request.method,
'user_agent': request.headers.get('User-Agent'),
'duration': time.time() - g.request_start_time
}
)
# Report to error tracking service
error_tracker.capture_exception(exception, {
'request_id': g.request_id,
'request': {
'url': request.url,
'method': request.method,
'headers': dict(request.headers)
}
})
@request_tearing_down.connect_via(app)
def cleanup_request(sender, exc=None, **extra):
# Clean up request-specific resources
if hasattr(g, 'database_connection'):
g.database_connection.close()
if hasattr(g, 'cache_client'):
g.cache_client.disconnect()from quart import Quart, websocket
from quart.signals import (
websocket_started, websocket_finished, websocket_tearing_down,
got_websocket_exception
)
app = Quart(__name__)
# Track active WebSocket connections
active_websockets = set()
websocket_stats = {'total_connections': 0, 'active_connections': 0}
@websocket_started.connect_via(app)
def on_websocket_start(sender, **extra):
active_websockets.add(websocket)
websocket_stats['total_connections'] += 1
websocket_stats['active_connections'] = len(active_websockets)
# Log connection details
app.logger.info(f"WebSocket connected from {websocket.remote_addr}, "
f"total active: {len(active_websockets)}")
@websocket_finished.connect_via(app)
def on_websocket_finish(sender, **extra):
active_websockets.discard(websocket)
websocket_stats['active_connections'] = len(active_websockets)
app.logger.info(f"WebSocket disconnected, "
f"remaining active: {len(active_websockets)}")
@websocket_tearing_down.connect_via(app)
def cleanup_websocket(sender, exc=None, **extra):
# Clean up WebSocket-specific resources
if hasattr(g, 'websocket_subscriptions'):
for subscription in g.websocket_subscriptions:
subscription.unsubscribe()
@got_websocket_exception.connect_via(app)
def on_websocket_exception(sender, exception, **extra):
app.logger.error(f"WebSocket error: {exception}")
# Attempt graceful cleanup
try:
await websocket.close(code=1011, reason="Internal server error")
except:
pass
@app.websocket('/ws/stats')
async def websocket_stats_endpoint():
await websocket.accept()
try:
while True:
await websocket.send_json(websocket_stats)
await asyncio.sleep(5)
except ConnectionClosed:
passfrom quart import Quart, render_template
from quart.signals import before_render_template, template_rendered
import time
from collections import defaultdict, Counter
app = Quart(__name__)
# Template rendering statistics
template_stats = {
'render_counts': Counter(),
'render_times': defaultdict(list),
'context_sizes': defaultdict(list)
}
@before_render_template.connect_via(app)
def on_before_render(sender, template, context, **extra):
# Record template access
template_stats['render_counts'][template.name] += 1
# Store render start time in context
context['_render_start'] = time.time()
# Record context size
context_size = len(str(context))
template_stats['context_sizes'][template.name].append(context_size)
# Add common template variables
context['app_version'] = app.config.get('VERSION', '1.0.0')
context['render_timestamp'] = time.time()
@template_rendered.connect_via(app)
def on_template_rendered(sender, template, context, **extra):
# Calculate render time
if '_render_start' in context:
render_time = time.time() - context['_render_start']
template_stats['render_times'][template.name].append(render_time)
# Log slow templates
if render_time > 0.1: # 100ms threshold
app.logger.warning(f"Slow template render: {template.name} "
f"took {render_time:.3f}s")
@app.route('/template-stats')
async def template_statistics():
# Calculate statistics
stats = {}
for template_name in template_stats['render_counts']:
times = template_stats['render_times'][template_name]
sizes = template_stats['context_sizes'][template_name]
stats[template_name] = {
'render_count': template_stats['render_counts'][template_name],
'avg_render_time': sum(times) / len(times) if times else 0,
'max_render_time': max(times) if times else 0,
'avg_context_size': sum(sizes) / len(sizes) if sizes else 0
}
return await render_template('template_stats.html', stats=stats)from quart import Quart, request
from quart.signals import (
got_request_exception, got_websocket_exception,
got_background_exception, got_serving_exception
)
from collections import deque
import asyncio
app = Quart(__name__)
# Error tracking
recent_errors = deque(maxlen=100) # Keep last 100 errors
error_counts = Counter()
async def send_alert(error_type, exception, context=None):
"""Send alert for critical errors."""
if app.config.get('ALERTS_ENABLED'):
alert_data = {
'type': error_type,
'exception': str(exception),
'timestamp': time.time(),
'context': context or {}
}
# Send to monitoring service
await monitoring_service.send_alert(alert_data)
@got_request_exception.connect_via(app)
def on_request_error(sender, exception, **extra):
error_type = type(exception).__name__
error_counts[error_type] += 1
error_data = {
'type': 'request_error',
'exception': str(exception),
'exception_type': error_type,
'path': request.path,
'method': request.method,
'timestamp': time.time()
}
recent_errors.append(error_data)
# Send alert for critical errors
if error_type in ['DatabaseError', 'ExternalServiceError']:
asyncio.create_task(send_alert('request_error', exception, {
'path': request.path,
'method': request.method
}))
@got_websocket_exception.connect_via(app)
def on_websocket_error(sender, exception, **extra):
error_type = type(exception).__name__
error_counts[f'websocket_{error_type}'] += 1
error_data = {
'type': 'websocket_error',
'exception': str(exception),
'exception_type': error_type,
'path': websocket.path,
'timestamp': time.time()
}
recent_errors.append(error_data)
@got_background_exception.connect_via(app)
def on_background_error(sender, exception, **extra):
error_type = type(exception).__name__
error_counts[f'background_{error_type}'] += 1
# Background errors are often critical
asyncio.create_task(send_alert('background_error', exception))
@app.route('/error-dashboard')
async def error_dashboard():
return await render_template('error_dashboard.html',
recent_errors=list(recent_errors),
error_counts=dict(error_counts))from quart import Quart
from quart.signals import appcontext_pushed
from blinker import Namespace
app = Quart(__name__)
# Custom application signals
app_signals = Namespace()
user_logged_in = app_signals.signal('user-logged-in')
data_updated = app_signals.signal('data-updated')
# Plugin registration system
@appcontext_pushed.connect_via(app)
def load_plugins(sender, **extra):
if not hasattr(g, 'plugins_loaded'):
# Load and initialize plugins
for plugin_name in app.config.get('ENABLED_PLUGINS', []):
plugin = load_plugin(plugin_name)
plugin.initialize(app)
g.plugins_loaded = True
# Plugin example: Audit logging
class AuditLogPlugin:
def initialize(self, app):
user_logged_in.connect(self.log_user_login, app)
data_updated.connect(self.log_data_update, app)
def log_user_login(self, sender, user, **extra):
audit_log.info(f"User {user.username} logged in from {request.remote_addr}")
def log_data_update(self, sender, model, action, **extra):
audit_log.info(f"Data update: {model.__class__.__name__} {action}")
# Usage in application code
@app.route('/login', methods=['POST'])
async def login():
user = await authenticate_user(request.form)
if user:
session['user_id'] = user.id
# Emit custom signal
user_logged_in.send(app, user=user)
return redirect(url_for('dashboard'))
@app.route('/api/data', methods=['POST'])
async def update_data():
data = await request.get_json()
model = await update_model(data)
# Emit custom signal
data_updated.send(app, model=model, action='update')
return jsonify({'status': 'updated'})Install with Tessl CLI
npx tessl i tessl/pypi-quart