CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-quart

A Python ASGI web framework with the same API as Flask

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

context.mddocs/

Context Management

Application, request, and WebSocket context handling with global proxy objects, context copying utilities, and lifecycle management for maintaining state across async operations.

Capabilities

Context State Functions

Functions to check current context availability and state.

def has_app_context() -> bool:
    """
    Check if currently inside an application context.
    
    Returns:
        True if in application context, False otherwise
    """

def has_request_context() -> bool:
    """
    Check if currently inside a request context.
    
    Returns:
        True if in request context, False otherwise
    """

def has_websocket_context() -> bool:
    """
    Check if currently inside a WebSocket context.
    
    Returns:
        True if in WebSocket context, False otherwise
    """

Context Copying Functions

Decorators and utilities for preserving context across async operations and background tasks.

def copy_current_app_context(func: Callable):
    """
    Copy current application context to decorated function.
    
    Args:
        func: Function to decorate with app context
        
    Returns:
        Decorated function that runs with copied app context
        
    Usage:
        @copy_current_app_context
        async def background_task():
            # Has access to current_app
            pass
    """

def copy_current_request_context(func: Callable):
    """
    Copy current request context to decorated function.
    
    Args:
        func: Function to decorate with request context
        
    Returns:
        Decorated function that runs with copied request context
        
    Usage:
        @copy_current_request_context
        async def process_request_data():
            # Has access to request, session, g
            pass
    """

def copy_current_websocket_context(func: Callable):
    """
    Copy current WebSocket context to decorated function.
    
    Args:
        func: Function to decorate with WebSocket context
        
    Returns:
        Decorated function that runs with copied WebSocket context
        
    Usage:
        @copy_current_websocket_context
        async def handle_websocket_data():
            # Has access to websocket
            pass
    """

After-Action Functions

Functions to schedule operations after current request or WebSocket connection completes.

def after_this_request(func: Callable):
    """
    Schedule function to run after current request completes.
    
    Args:
        func: Function to run after request (receives response as argument)
        
    Usage:
        @after_this_request
        def log_request(response):
            # Log request details
            pass
    """

def after_this_websocket(func: Callable):
    """
    Schedule function to run after current WebSocket connection ends.
    
    Args:
        func: Function to run after WebSocket closes
        
    Usage:
        @after_this_websocket
        def cleanup_websocket():
            # Clean up WebSocket resources
            pass
    """

Global Proxy Objects

Context-aware proxy objects that provide access to current application, request, and WebSocket state.

# Application Context Globals
current_app: Quart
    """
    Proxy to current application instance.
    Available in: app context, request context, WebSocket context
    """

g: object
    """
    Application context global object for storing data.
    Available in: app context, request context, WebSocket context
    
    Usage:
        g.user_id = 123
        g.start_time = time.time()
    """

# Request Context Globals
request: Request
    """
    Proxy to current request object.
    Available in: request context only
    """

session: dict
    """
    Proxy to current session object.
    Available in: request context only
    
    Usage:
        session['user_id'] = 123
        user_id = session.get('user_id')
    """

# WebSocket Context Globals
websocket: Websocket
    """
    Proxy to current WebSocket object.
    Available in: WebSocket context only
    """

Usage Examples

Context State Checking

from quart import Quart, has_app_context, has_request_context, has_websocket_context
from quart import current_app, request, websocket, g

app = Quart(__name__)

async def utility_function():
    """Utility function that works in different contexts."""
    
    if has_app_context():
        app_name = current_app.name
        g.utility_called = True
    else:
        app_name = "No app context"
    
    if has_request_context():
        method = request.method
        path = request.path
    else:
        method = path = "No request context"
    
    if has_websocket_context():
        ws_path = websocket.path
    else:
        ws_path = "No WebSocket context"
    
    return {
        'app_name': app_name,
        'method': method,
        'path': path,
        'websocket_path': ws_path
    }

@app.route('/context-info')
async def context_info():
    info = await utility_function()
    return info

@app.websocket('/ws/context-info')
async def websocket_context_info():
    await websocket.accept()
    info = await utility_function()
    await websocket.send_json(info)

Context Copying for Background Tasks

from quart import Quart, copy_current_request_context, copy_current_app_context
from quart import request, current_app, g
import asyncio

app = Quart(__name__)

@app.route('/start-background-task', methods=['POST'])
async def start_background_task():
    # Set up request-specific data
    g.user_id = request.json.get('user_id')
    g.task_id = generate_task_id()
    
    # Start background task with request context
    asyncio.create_task(process_user_data())
    
    return {'task_id': g.task_id, 'status': 'started'}

@copy_current_request_context
async def process_user_data():
    """Background task with access to request context."""
    try:
        # Access request context data
        user_id = g.user_id
        task_id = g.task_id
        
        # Perform long-running operation
        result = await perform_user_processing(user_id)
        
        # Log with app context
        current_app.logger.info(f"Task {task_id} completed for user {user_id}")
        
        # Store result
        await store_task_result(task_id, result)
        
    except Exception as e:
        current_app.logger.error(f"Task {task_id} failed: {e}")

@app.route('/batch-process', methods=['POST'])
async def batch_process():
    user_ids = request.json.get('user_ids', [])
    
    # Create background tasks for each user
    tasks = []
    for user_id in user_ids:
        # Create task with current app context
        task = create_user_task(user_id)
        tasks.append(asyncio.create_task(task))
    
    # Wait for all tasks (optional)
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    return {'processed': len(user_ids), 'results': len([r for r in results if not isinstance(r, Exception)])}

@copy_current_app_context
async def create_user_task(user_id):
    """Individual user processing task with app context."""
    try:
        # Access app configuration
        batch_size = current_app.config.get('BATCH_SIZE', 100)
        
        # Process user data
        result = await process_single_user(user_id, batch_size)
        
        # Log using app logger
        current_app.logger.info(f"Processed user {user_id}")
        
        return result
        
    except Exception as e:
        current_app.logger.error(f"Failed to process user {user_id}: {e}")
        raise

After-Request Processing

from quart import Quart, after_this_request, request, g
import time

app = Quart(__name__)

@app.before_request
async def before_request():
    g.start_time = time.time()
    g.request_id = generate_request_id()

@app.route('/tracked-endpoint')
async def tracked_endpoint():
    # Register after-request handler
    @after_this_request
    def log_request_timing(response):
        duration = time.time() - g.start_time
        
        # Log request details
        current_app.logger.info(
            f"Request {g.request_id}: {request.method} {request.path} "
            f"-> {response.status_code} ({duration:.3f}s)"
        )
        
        # Add timing header
        response.headers['X-Response-Time'] = f"{duration:.3f}s"
        return response
    
    # Process request
    data = await get_some_data()
    return {'data': data, 'request_id': g.request_id}

@app.route('/audit-endpoint', methods=['POST'])
async def audit_endpoint():
    # Register multiple after-request handlers
    @after_this_request
    def audit_log(response):
        # Audit logging
        audit_data = {
            'request_id': g.request_id,
            'user_id': request.json.get('user_id'),
            'action': 'data_update',
            'status': response.status_code,
            'timestamp': time.time()
        }
        asyncio.create_task(write_audit_log(audit_data))
        return response
    
    @after_this_request
    def update_metrics(response):
        # Update application metrics
        metrics.increment('requests.audit_endpoint')
        if response.status_code >= 400:
            metrics.increment('requests.errors')
        return response
    
    # Process the request
    result = await process_audit_request(request.json)
    return result

WebSocket Context Management

from quart import Quart, copy_current_websocket_context, after_this_websocket
from quart import websocket, current_app
import asyncio

app = Quart(__name__)

# Store active WebSocket connections
active_connections = set()

@app.websocket('/ws/managed')
async def managed_websocket():
    await websocket.accept()
    
    # Add to active connections
    active_connections.add(websocket)
    
    # Register cleanup handler
    @after_this_websocket
    def cleanup_connection():
        active_connections.discard(websocket)
        current_app.logger.info(f"WebSocket connection closed, {len(active_connections)} remaining")
    
    # Start background tasks with WebSocket context
    asyncio.create_task(heartbeat_task())
    asyncio.create_task(broadcast_listener())
    
    try:
        while True:
            message = await websocket.receive_json()
            await handle_message(message)
            
    except ConnectionClosed:
        pass

@copy_current_websocket_context
async def heartbeat_task():
    """Send periodic heartbeat with WebSocket context."""
    try:
        while True:
            await asyncio.sleep(30)
            await websocket.send_json({
                'type': 'heartbeat',
                'timestamp': time.time(),
                'connection_count': len(active_connections)
            })
    except ConnectionClosed:
        pass

@copy_current_websocket_context
async def broadcast_listener():
    """Listen for broadcast messages with WebSocket context."""
    try:
        async for message in get_broadcast_messages():
            await websocket.send_json({
                'type': 'broadcast',
                'data': message
            })
    except ConnectionClosed:
        pass

async def handle_message(message):
    """Handle individual WebSocket message."""
    if message.get('type') == 'ping':
        await websocket.send_json({'type': 'pong'})
    elif message.get('type') == 'echo':
        await websocket.send_json({
            'type': 'echo_response',
            'data': message.get('data')
        })

Session Management

from quart import Quart, session, request, redirect, url_for

app = Quart(__name__)
app.secret_key = 'your-secret-key'

@app.route('/login', methods=['POST'])
async def login():
    username = (await request.form).get('username')
    password = (await request.form).get('password')
    
    if await validate_user(username, password):
        # Store user data in session
        session['user_id'] = await get_user_id(username)
        session['username'] = username
        session['login_time'] = time.time()
        session.permanent = True  # Use permanent session
        
        return redirect(url_for('dashboard'))
    else:
        return redirect(url_for('login_form', error='invalid'))

@app.route('/dashboard')
async def dashboard():
    if 'user_id' not in session:
        return redirect(url_for('login_form'))
    
    # Access session data
    user_id = session['user_id']
    username = session['username']
    login_time = session.get('login_time')
    
    user_data = await get_user_data(user_id)
    
    return await render_template('dashboard.html',
                               user=user_data,
                               username=username,
                               login_time=login_time)

@app.route('/logout')
async def logout():
    # Clear session
    session.clear()
    return redirect(url_for('login_form'))

@app.before_request
async def check_session():
    # Skip auth check for certain routes
    if request.endpoint in ['login', 'login_form', 'static']:
        return
    
    # Check if user is logged in
    if 'user_id' not in session:
        return redirect(url_for('login_form'))
    
    # Check session expiry
    if session.get('login_time', 0) < time.time() - 3600:  # 1 hour
        session.clear()
        return redirect(url_for('login_form', error='expired'))

Global Object Usage

from quart import Quart, g, current_app, request

app = Quart(__name__)

@app.before_request
async def load_user():
    # Store user data in g for request duration
    user_id = request.headers.get('X-User-ID')
    if user_id:
        g.user = await load_user_data(user_id)
        g.user_permissions = await load_user_permissions(user_id)
    else:
        g.user = None
        g.user_permissions = []

@app.route('/protected-resource')
async def protected_resource():
    # Use data from g
    if not g.user:
        return {'error': 'Authentication required'}, 401
    
    if 'read_resource' not in g.user_permissions:
        return {'error': 'Permission denied'}, 403
    
    # Access current app configuration
    max_results = current_app.config.get('MAX_RESULTS', 50)
    
    resources = await get_user_resources(g.user.id, limit=max_results)
    
    return {
        'user': g.user.username,
        'resources': resources,
        'total': len(resources)
    }

@app.teardown_appcontext
def cleanup_g(error):
    # Clean up g object resources
    if hasattr(g, 'db_connection'):
        g.db_connection.close()

Install with Tessl CLI

npx tessl i tessl/pypi-quart

docs

context.md

core-application.md

helpers.md

index.md

request-response.md

signals.md

templates.md

testing.md

websocket.md

tile.json