CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-quart

A Python ASGI web framework with the same API as Flask

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

websocket.mddocs/

WebSocket Support

Native async WebSocket connection handling with JSON message support, connection lifecycle management, and comprehensive WebSocket protocol features.

Capabilities

WebSocket Connection Management

Represents WebSocket connections with async support for bidirectional communication and connection lifecycle management.

class Websocket:
    # Connection attributes
    path: str  # WebSocket URL path
    query_string: bytes  # Raw query string
    args: ImmutableMultiDict  # Parsed query string arguments
    headers: Headers  # Connection headers
    requested_subprotocols: list[str]  # Client requested subprotocols
    
    async def accept(
        self, 
        headers: dict | Headers | None = None, 
        subprotocol: str | None = None
    ):
        """
        Accept WebSocket connection.
        
        Args:
            headers: Additional headers to send with acceptance
            subprotocol: Selected subprotocol from client's requested list
        """
        
    async def receive(self) -> str | bytes:
        """
        Receive data from client.
        
        Returns:
            Message data as string or bytes
            
        Raises:
            ConnectionClosed: If connection is closed
        """
        
    async def send(self, data: str | bytes):
        """
        Send data to client.
        
        Args:
            data: Message data to send (string or bytes)
            
        Raises:
            ConnectionClosed: If connection is closed
        """
        
    async def receive_json(self):
        """
        Receive JSON data from client.
        
        Returns:
            Parsed JSON data
            
        Raises:
            ConnectionClosed: If connection is closed
            BadRequest: If data is not valid JSON
        """
        
    async def send_json(self, *args, **kwargs):
        """
        Send JSON data to client.
        
        Args:
            *args: Data to serialize as JSON (single positional arg or multiple args)
            **kwargs: Additional arguments for json.dumps() or dict data if no args
            
        Raises:
            ConnectionClosed: If connection is closed
        """
        
    async def close(self, code: int, reason: str = ""):
        """
        Close WebSocket connection.
        
        Args:
            code: WebSocket close code (required, e.g. 1000 = normal closure)
            reason: Close reason message
        """

Usage Examples

Basic WebSocket Handler

from quart import Quart, websocket

app = Quart(__name__)

@app.websocket('/ws')
async def ws():
    # Accept the connection
    await websocket.accept()
    
    try:
        while True:
            # Receive message from client
            message = await websocket.receive()
            
            # Echo message back to client
            await websocket.send(f"Echo: {message}")
            
    except ConnectionClosed:
        print("WebSocket connection closed")

JSON Message Handling

from quart import Quart, websocket

app = Quart(__name__)

@app.websocket('/api/ws')
async def api_websocket():
    await websocket.accept()
    
    try:
        while True:
            # Receive JSON data
            data = await websocket.receive_json()
            
            # Process the message
            response = {
                'type': 'response',
                'original': data,
                'timestamp': time.time()
            }
            
            # Send JSON response
            await websocket.send_json(response)
            
    except ConnectionClosed:
        print("Client disconnected")
    except BadRequest as e:
        # Handle invalid JSON
        await websocket.send_json({
            'type': 'error',
            'message': 'Invalid JSON format'
        })
        await websocket.close(code=1003, reason="Invalid data format")

WebSocket with Authentication

from quart import Quart, websocket, abort

app = Quart(__name__)

@app.websocket('/secure/ws')
async def secure_websocket():
    # Check authentication before accepting
    auth_token = websocket.args.get('token')
    if not auth_token or not validate_token(auth_token):
        await websocket.close(code=1008, reason="Authentication required")
        return
    
    # Accept authenticated connection
    await websocket.accept()
    
    try:
        while True:
            message = await websocket.receive_json()
            
            # Handle authenticated message
            response = await process_authenticated_message(message, auth_token)
            await websocket.send_json(response)
            
    except ConnectionClosed:
        print(f"Authenticated client disconnected: {auth_token}")

def validate_token(token):
    # Implement token validation logic
    return token == "valid_token"

async def process_authenticated_message(message, token):
    # Process message with authentication context
    return {
        'status': 'processed',
        'data': message,
        'user': get_user_from_token(token)
    }

WebSocket Room/Broadcasting System

from quart import Quart, websocket
import asyncio

app = Quart(__name__)

# Store active connections
connections = set()

@app.websocket('/chat')
async def chat_websocket():
    await websocket.accept()
    connections.add(websocket)
    
    try:
        # Send welcome message
        await websocket.send_json({
            'type': 'system',
            'message': f'Connected! {len(connections)} users online.'
        })
        
        # Broadcast new user joined
        await broadcast_message({
            'type': 'user_joined',
            'count': len(connections)
        }, exclude=websocket)
        
        while True:
            # Receive message from this client
            data = await websocket.receive_json()
            
            # Broadcast to all other clients
            broadcast_data = {
                'type': 'message',
                'user': data.get('user', 'Anonymous'),
                'message': data.get('message', ''),
                'timestamp': time.time()
            }
            
            await broadcast_message(broadcast_data, exclude=websocket)
            
    except ConnectionClosed:
        pass
    finally:
        # Remove connection and notify others
        connections.discard(websocket)
        await broadcast_message({
            'type': 'user_left',
            'count': len(connections)
        })

async def broadcast_message(data, exclude=None):
    """Broadcast message to all connected clients."""
    if not connections:
        return
        
    # Create list of send tasks
    tasks = []
    for conn in connections.copy():  # Copy to avoid modification during iteration
        if conn != exclude:
            tasks.append(send_safe(conn, data))
    
    # Send to all connections concurrently
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)

async def send_safe(ws, data):
    """Send data to websocket, handling connection errors."""
    try:
        await ws.send_json(data)
    except ConnectionClosed:
        # Remove broken connection
        connections.discard(ws)

WebSocket with Heartbeat/Ping-Pong

from quart import Quart, websocket
import asyncio

app = Quart(__name__)

@app.websocket('/ws/heartbeat')
async def heartbeat_websocket():
    await websocket.accept()
    
    # Start heartbeat task
    heartbeat_task = asyncio.create_task(heartbeat_handler())
    
    try:
        while True:
            # Receive messages with timeout
            try:
                message = await asyncio.wait_for(
                    websocket.receive(), 
                    timeout=30.0  # 30 second timeout
                )
                
                # Handle regular messages
                if message == 'ping':
                    await websocket.send('pong')
                else:
                    # Process other messages
                    await websocket.send(f"Received: {message}")
                    
            except asyncio.TimeoutError:
                # Send ping to check if client is still alive
                await websocket.send('ping')
                
    except ConnectionClosed:
        print("Client disconnected")
    finally:
        # Cancel heartbeat task
        heartbeat_task.cancel()
        try:
            await heartbeat_task
        except asyncio.CancelledError:
            pass

async def heartbeat_handler():
    """Send periodic heartbeat messages."""
    try:
        while True:
            await asyncio.sleep(20)  # Send heartbeat every 20 seconds
            await websocket.send_json({
                'type': 'heartbeat',
                'timestamp': time.time()
            })
    except (ConnectionClosed, asyncio.CancelledError):
        pass

WebSocket Error Handling

from quart import Quart, websocket
from quart.exceptions import BadRequest, ConnectionClosed

app = Quart(__name__)

@app.websocket('/ws/robust')
async def robust_websocket():
    try:
        await websocket.accept()
        
        while True:
            try:
                # Receive and validate message
                data = await websocket.receive_json()
                
                # Validate message structure
                if not isinstance(data, dict) or 'type' not in data:
                    await websocket.send_json({
                        'type': 'error',
                        'code': 'INVALID_FORMAT',
                        'message': 'Message must be JSON object with "type" field'
                    })
                    continue
                
                # Process different message types
                if data['type'] == 'echo':
                    await websocket.send_json({
                        'type': 'echo_response',
                        'original': data.get('message', '')
                    })
                elif data['type'] == 'time':
                    await websocket.send_json({
                        'type': 'time_response',
                        'timestamp': time.time()
                    })
                else:
                    await websocket.send_json({
                        'type': 'error',
                        'code': 'UNKNOWN_TYPE',
                        'message': f'Unknown message type: {data["type"]}'
                    })
                    
            except BadRequest:
                # Invalid JSON received
                await websocket.send_json({
                    'type': 'error',
                    'code': 'INVALID_JSON',
                    'message': 'Invalid JSON format'
                })
                
            except Exception as e:
                # Handle unexpected errors
                await websocket.send_json({
                    'type': 'error',
                    'code': 'INTERNAL_ERROR',
                    'message': 'An internal error occurred'
                })
                print(f"WebSocket error: {e}")
                
    except ConnectionClosed:
        print("WebSocket connection closed normally")
    except Exception as e:
        print(f"WebSocket handler error: {e}")
        try:
            await websocket.close(code=1011, reason="Internal server error")
        except:
            pass  # Connection might already be closed

Install with Tessl CLI

npx tessl i tessl/pypi-quart

docs

context.md

core-application.md

helpers.md

index.md

request-response.md

signals.md

templates.md

testing.md

websocket.md

tile.json