CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-quart

A Python ASGI web framework with the same API as Flask

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

signals.mddocs/

Signal System

Event-driven hooks for application lifecycle, request/response processing, WebSocket handling, error management, and template rendering with comprehensive signal support for monitoring and extending application behavior.

Capabilities

Application Lifecycle Signals

Signals fired during application context creation, destruction, and teardown phases.

# Application context signals
appcontext_pushed: Namespace
    """
    Signal sent when application context is pushed.
    
    Sender: The application instance
    Arguments: None
    
    Usage:
        @appcontext_pushed.connect
        def on_app_context_pushed(sender, **extra):
            # Application context is now available
            pass
    """

appcontext_popped: Namespace
    """
    Signal sent when application context is popped.
    
    Sender: The application instance
    Arguments: None
    
    Usage:
        @appcontext_popped.connect
        def on_app_context_popped(sender, **extra):
            # Application context is being removed
            pass
    """

appcontext_tearing_down: Namespace
    """
    Signal sent when application context is tearing down.
    
    Sender: The application instance
    Arguments:
        exc: Exception that caused teardown (None for normal teardown)
    
    Usage:
        @appcontext_tearing_down.connect
        def on_app_context_teardown(sender, exc=None, **extra):
            # Clean up app context resources
            if exc:
                # Handle teardown due to exception
                pass
    """

Request Lifecycle Signals

Signals for HTTP request processing lifecycle events.

# Request processing signals
request_started: Namespace
    """
    Signal sent when request processing starts.
    
    Sender: The application instance
    Arguments: None
    
    Usage:
        @request_started.connect
        def on_request_started(sender, **extra):
            # Request processing has begun
            g.start_time = time.time()
    """

request_finished: Namespace
    """
    Signal sent when request processing finishes successfully.
    
    Sender: The application instance
    Arguments:
        response: The response object
    
    Usage:
        @request_finished.connect
        def on_request_finished(sender, response, **extra):
            # Request completed successfully
            duration = time.time() - g.start_time
            log_request(request, response, duration)
    """

request_tearing_down: Namespace
    """
    Signal sent when request context is tearing down.
    
    Sender: The application instance
    Arguments:
        exc: Exception that caused teardown (None for normal teardown)
    
    Usage:
        @request_tearing_down.connect
        def on_request_teardown(sender, exc=None, **extra):
            # Clean up request resources
            if hasattr(g, 'db_connection'):
                g.db_connection.close()
    """

WebSocket Lifecycle Signals

Signals for WebSocket connection lifecycle events.

# WebSocket processing signals
websocket_started: Namespace
    """
    Signal sent when WebSocket processing starts.
    
    Sender: The application instance
    Arguments: None
    
    Usage:
        @websocket_started.connect
        def on_websocket_started(sender, **extra):
            # WebSocket connection has begun
            active_connections.add(websocket)
    """

websocket_finished: Namespace
    """
    Signal sent when WebSocket processing finishes.
    
    Sender: The application instance
    Arguments: None
    
    Usage:
        @websocket_finished.connect
        def on_websocket_finished(sender, **extra):
            # WebSocket connection ended
            active_connections.discard(websocket)
    """

websocket_tearing_down: Namespace
    """
    Signal sent when WebSocket context is tearing down.
    
    Sender: The application instance
    Arguments:
        exc: Exception that caused teardown (None for normal teardown)
    
    Usage:
        @websocket_tearing_down.connect
        def on_websocket_teardown(sender, exc=None, **extra):
            # Clean up WebSocket resources
            cleanup_websocket_resources()
    """

Template Signals

Signals fired during template rendering operations.

# Template rendering signals
before_render_template: Namespace
    """
    Signal sent before template rendering begins.
    
    Sender: The application instance
    Arguments:
        template: The template object
        context: Template context dictionary
    
    Usage:
        @before_render_template.connect
        def on_before_render(sender, template, context, **extra):
            # Modify context or log template access
            context['render_start_time'] = time.time()
    """

template_rendered: Namespace
    """
    Signal sent after template has been rendered.
    
    Sender: The application instance
    Arguments:
        template: The template object
        context: Template context dictionary
    
    Usage:
        @template_rendered.connect
        def on_template_rendered(sender, template, context, **extra):
            # Log template rendering or clean up
            duration = time.time() - context.get('render_start_time', 0)
            log_template_render(template.name, duration)
    """

Exception Signals

Signals for error handling and exception management.

# Exception handling signals
got_request_exception: Namespace
    """
    Signal sent when request processing encounters an exception.
    
    Sender: The application instance
    Arguments:
        exception: The exception object
    
    Usage:
        @got_request_exception.connect
        def on_request_exception(sender, exception, **extra):
            # Log or handle request exceptions
            error_tracker.report_exception(exception, request)
    """

got_websocket_exception: Namespace
    """
    Signal sent when WebSocket processing encounters an exception.
    
    Sender: The application instance
    Arguments:
        exception: The exception object
    
    Usage:
        @got_websocket_exception.connect
        def on_websocket_exception(sender, exception, **extra):
            # Log or handle WebSocket exceptions
            error_tracker.report_websocket_exception(exception, websocket)
    """

got_background_exception: Namespace
    """
    Signal sent when background task encounters an exception.
    
    Sender: The application instance
    Arguments:
        exception: The exception object
    
    Usage:
        @got_background_exception.connect
        def on_background_exception(sender, exception, **extra):
            # Handle background task exceptions
            background_error_handler.handle(exception)
    """

got_serving_exception: Namespace
    """
    Signal sent when server encounters an exception during serving.
    
    Sender: The application instance
    Arguments:
        exception: The exception object
    
    Usage:
        @got_serving_exception.connect
        def on_serving_exception(sender, exception, **extra):
            # Handle server-level exceptions
            server_monitor.report_exception(exception)
    """

Flash Message Signals

Signals for flash message system events.

# Flash messaging signals
message_flashed: Namespace
    """
    Signal sent when a flash message is added.
    
    Sender: The application instance
    Arguments:
        message: The flash message text
        category: The message category
    
    Usage:
        @message_flashed.connect
        def on_message_flashed(sender, message, category, **extra):
            # Log or process flash messages
            audit_log.record_flash_message(message, category)
    """

Signal Availability

Constant indicating whether signal support is available.

signals_available: bool
    """
    Boolean indicating whether signal support is available.
    
    Value: True if blinker is installed, False otherwise
    
    Usage:
        if signals_available:
            # Use signal-based features
            setup_signal_handlers()
        else:
            # Fallback to non-signal approach
            setup_polling_monitor()
    """

Usage Examples

Application Monitoring

from quart import Quart, g
from quart.signals import (
    request_started, request_finished, request_tearing_down,
    got_request_exception, appcontext_pushed, appcontext_popped
)
import time

app = Quart(__name__)

# Request timing and monitoring
@request_started.connect_via(app)
def on_request_start(sender, **extra):
    g.request_start_time = time.time()
    g.request_id = generate_request_id()

@request_finished.connect_via(app)
def on_request_finish(sender, response, **extra):
    duration = time.time() - g.request_start_time
    
    # Log request metrics
    metrics.record_request_duration(duration)
    metrics.increment(f'requests.status.{response.status_code}')
    
    # Add timing header
    response.headers['X-Response-Time'] = f'{duration:.3f}s'
    response.headers['X-Request-ID'] = g.request_id

@got_request_exception.connect_via(app)
def on_request_exception(sender, exception, **extra):
    # Log exception with context
    error_logger.error(
        f"Request {g.request_id} failed: {exception}",
        extra={
            'request_id': g.request_id,
            'path': request.path,
            'method': request.method,
            'user_agent': request.headers.get('User-Agent'),
            'duration': time.time() - g.request_start_time
        }
    )
    
    # Report to error tracking service
    error_tracker.capture_exception(exception, {
        'request_id': g.request_id,
        'request': {
            'url': request.url,
            'method': request.method,
            'headers': dict(request.headers)
        }
    })

@request_tearing_down.connect_via(app)
def cleanup_request(sender, exc=None, **extra):
    # Clean up request-specific resources
    if hasattr(g, 'database_connection'):
        g.database_connection.close()
    
    if hasattr(g, 'cache_client'):
        g.cache_client.disconnect()

WebSocket Connection Management

from quart import Quart, websocket
from quart.signals import (
    websocket_started, websocket_finished, websocket_tearing_down,
    got_websocket_exception
)

app = Quart(__name__)

# Track active WebSocket connections
active_websockets = set()
websocket_stats = {'total_connections': 0, 'active_connections': 0}

@websocket_started.connect_via(app)
def on_websocket_start(sender, **extra):
    active_websockets.add(websocket)
    websocket_stats['total_connections'] += 1
    websocket_stats['active_connections'] = len(active_websockets)
    
    # Log connection details
    app.logger.info(f"WebSocket connected from {websocket.remote_addr}, "
                   f"total active: {len(active_websockets)}")

@websocket_finished.connect_via(app)
def on_websocket_finish(sender, **extra):
    active_websockets.discard(websocket)
    websocket_stats['active_connections'] = len(active_websockets)
    
    app.logger.info(f"WebSocket disconnected, "
                   f"remaining active: {len(active_websockets)}")

@websocket_tearing_down.connect_via(app)
def cleanup_websocket(sender, exc=None, **extra):
    # Clean up WebSocket-specific resources
    if hasattr(g, 'websocket_subscriptions'):
        for subscription in g.websocket_subscriptions:
            subscription.unsubscribe()

@got_websocket_exception.connect_via(app)
def on_websocket_exception(sender, exception, **extra):
    app.logger.error(f"WebSocket error: {exception}")
    
    # Attempt graceful cleanup
    try:
        await websocket.close(code=1011, reason="Internal server error")
    except:
        pass

@app.websocket('/ws/stats')
async def websocket_stats_endpoint():
    await websocket.accept()
    
    try:
        while True:
            await websocket.send_json(websocket_stats)
            await asyncio.sleep(5)
    except ConnectionClosed:
        pass

Template Rendering Analytics

from quart import Quart, render_template
from quart.signals import before_render_template, template_rendered
import time
from collections import defaultdict, Counter

app = Quart(__name__)

# Template rendering statistics
template_stats = {
    'render_counts': Counter(),
    'render_times': defaultdict(list),
    'context_sizes': defaultdict(list)
}

@before_render_template.connect_via(app)
def on_before_render(sender, template, context, **extra):
    # Record template access
    template_stats['render_counts'][template.name] += 1
    
    # Store render start time in context
    context['_render_start'] = time.time()
    
    # Record context size
    context_size = len(str(context))
    template_stats['context_sizes'][template.name].append(context_size)
    
    # Add common template variables
    context['app_version'] = app.config.get('VERSION', '1.0.0')
    context['render_timestamp'] = time.time()

@template_rendered.connect_via(app)
def on_template_rendered(sender, template, context, **extra):
    # Calculate render time
    if '_render_start' in context:
        render_time = time.time() - context['_render_start']
        template_stats['render_times'][template.name].append(render_time)
        
        # Log slow templates
        if render_time > 0.1:  # 100ms threshold
            app.logger.warning(f"Slow template render: {template.name} "
                             f"took {render_time:.3f}s")

@app.route('/template-stats')
async def template_statistics():
    # Calculate statistics
    stats = {}
    for template_name in template_stats['render_counts']:
        times = template_stats['render_times'][template_name]
        sizes = template_stats['context_sizes'][template_name]
        
        stats[template_name] = {
            'render_count': template_stats['render_counts'][template_name],
            'avg_render_time': sum(times) / len(times) if times else 0,
            'max_render_time': max(times) if times else 0,
            'avg_context_size': sum(sizes) / len(sizes) if sizes else 0
        }
    
    return await render_template('template_stats.html', stats=stats)

Error Tracking and Alerting

from quart import Quart, request
from quart.signals import (
    got_request_exception, got_websocket_exception, 
    got_background_exception, got_serving_exception
)
from collections import deque
import asyncio

app = Quart(__name__)

# Error tracking
recent_errors = deque(maxlen=100)  # Keep last 100 errors
error_counts = Counter()

async def send_alert(error_type, exception, context=None):
    """Send alert for critical errors."""
    if app.config.get('ALERTS_ENABLED'):
        alert_data = {
            'type': error_type,
            'exception': str(exception),
            'timestamp': time.time(),
            'context': context or {}
        }
        
        # Send to monitoring service
        await monitoring_service.send_alert(alert_data)

@got_request_exception.connect_via(app)
def on_request_error(sender, exception, **extra):
    error_type = type(exception).__name__
    error_counts[error_type] += 1
    
    error_data = {
        'type': 'request_error',
        'exception': str(exception),
        'exception_type': error_type,
        'path': request.path,
        'method': request.method,
        'timestamp': time.time()
    }
    
    recent_errors.append(error_data)
    
    # Send alert for critical errors
    if error_type in ['DatabaseError', 'ExternalServiceError']:
        asyncio.create_task(send_alert('request_error', exception, {
            'path': request.path,
            'method': request.method
        }))

@got_websocket_exception.connect_via(app)
def on_websocket_error(sender, exception, **extra):
    error_type = type(exception).__name__
    error_counts[f'websocket_{error_type}'] += 1
    
    error_data = {
        'type': 'websocket_error',
        'exception': str(exception),
        'exception_type': error_type,
        'path': websocket.path,
        'timestamp': time.time()
    }
    
    recent_errors.append(error_data)

@got_background_exception.connect_via(app)
def on_background_error(sender, exception, **extra):
    error_type = type(exception).__name__
    error_counts[f'background_{error_type}'] += 1
    
    # Background errors are often critical
    asyncio.create_task(send_alert('background_error', exception))

@app.route('/error-dashboard')
async def error_dashboard():
    return await render_template('error_dashboard.html',
                               recent_errors=list(recent_errors),
                               error_counts=dict(error_counts))

Signal-Based Plugin System

from quart import Quart
from quart.signals import appcontext_pushed
from blinker import Namespace

app = Quart(__name__)

# Custom application signals
app_signals = Namespace()
user_logged_in = app_signals.signal('user-logged-in')
data_updated = app_signals.signal('data-updated')

# Plugin registration system
@appcontext_pushed.connect_via(app)
def load_plugins(sender, **extra):
    if not hasattr(g, 'plugins_loaded'):
        # Load and initialize plugins
        for plugin_name in app.config.get('ENABLED_PLUGINS', []):
            plugin = load_plugin(plugin_name)
            plugin.initialize(app)
        
        g.plugins_loaded = True

# Plugin example: Audit logging
class AuditLogPlugin:
    def initialize(self, app):
        user_logged_in.connect(self.log_user_login, app)
        data_updated.connect(self.log_data_update, app)
    
    def log_user_login(self, sender, user, **extra):
        audit_log.info(f"User {user.username} logged in from {request.remote_addr}")
    
    def log_data_update(self, sender, model, action, **extra):
        audit_log.info(f"Data update: {model.__class__.__name__} {action}")

# Usage in application code
@app.route('/login', methods=['POST'])
async def login():
    user = await authenticate_user(request.form)
    if user:
        session['user_id'] = user.id
        
        # Emit custom signal
        user_logged_in.send(app, user=user)
        
        return redirect(url_for('dashboard'))

@app.route('/api/data', methods=['POST'])
async def update_data():
    data = await request.get_json()
    model = await update_model(data)
    
    # Emit custom signal
    data_updated.send(app, model=model, action='update')
    
    return jsonify({'status': 'updated'})

Install with Tessl CLI

npx tessl i tessl/pypi-quart

docs

context.md

core-application.md

helpers.md

index.md

request-response.md

signals.md

templates.md

testing.md

websocket.md

tile.json