CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-falcon

The ultra-reliable, fast ASGI+WSGI framework for building data plane APIs at scale.

Pending
Overview
Eval results
Files

asgi-websocket.mddocs/

ASGI and WebSocket Support

Modern asynchronous application support with WebSocket handling, Server-Sent Events, and full ASGI 3.0 compatibility for building real-time web applications.

Capabilities

ASGI Application

Asynchronous application support for modern web development with WebSocket capabilities.

class falcon.asgi.App:
    def __init__(
        self,
        media_type: str = 'application/json',
        request_type: type = None,
        response_type: type = None,
        middleware: list = None,
        router: object = None,
        cors_enable: bool = False,
        req_options: RequestOptions = None,
        resp_options: ResponseOptions = None,
        secure_cookies_by_default: bool = None
    ):
        """
        Create an ASGI application.
        
        Args: Same as WSGI App constructor
        """
    
    async def __call__(self, scope: dict, receive: callable, send: callable):
        """
        ASGI 3.0 callable interface.
        
        Args:
            scope: ASGI scope dictionary
            receive: ASGI receive callable
            send: ASGI send callable
        """
    
    # Same methods as WSGI App:
    # add_route, add_static_route, add_sink, add_middleware,
    # add_error_handler, set_error_serializer

ASGI App Usage

import falcon.asgi
import asyncio

class AsyncUserResource:
    async def on_get(self, req, resp, user_id=None):
        if user_id:
            # Async database call
            user = await fetch_user_async(user_id)
            resp.media = user
        else:
            users = await fetch_all_users_async()
            resp.media = {'users': users}
    
    async def on_post(self, req, resp):
        user_data = req.media
        new_user = await create_user_async(user_data)
        resp.status = falcon.HTTP_201
        resp.media = new_user

# Create ASGI app
app = falcon.asgi.App()
app.add_route('/users', AsyncUserResource())
app.add_route('/users/{user_id}', AsyncUserResource())

# Run with ASGI server
if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='0.0.0.0', port=8000)

WebSocket Support

Full-featured WebSocket handling for real-time communication.

class WebSocket:
    def __init__(self, scope: dict, receive: callable, send: callable):
        """
        WebSocket connection handler.
        
        Args:
            scope: ASGI WebSocket scope
            receive: ASGI receive callable
            send: ASGI send callable
        """
    
    # Connection state
    ready_state: int  # WebSocket ready state
    client_address: tuple  # Client IP and port
    context: object  # User data storage
    path: str  # WebSocket path
    query_string: str  # Query string
    headers: dict  # WebSocket headers
    subprotocols: list  # Supported subprotocols
    
    # Connection management
    async def accept(self, subprotocol: str = None, headers: dict = None):
        """
        Accept WebSocket connection.
        
        Args:
            subprotocol: Selected subprotocol
            headers: Additional response headers
        """
    
    async def close(self, code: int = 1000, reason: str = None):
        """
        Close WebSocket connection.
        
        Args:
            code: Close status code
            reason: Close reason
        """
    
    # Message receiving
    async def receive_text(self) -> str:
        """
        Receive text message.
        
        Returns:
            Text message content
            
        Raises:
            WebSocketDisconnected: If connection closed
        """
    
    async def receive_data(self) -> bytes:
        """
        Receive binary message.
        
        Returns:
            Binary message content
            
        Raises:
            WebSocketDisconnected: If connection closed
        """
    
    async def receive_json(self) -> object:
        """
        Receive JSON message.
        
        Returns:
            Parsed JSON data
            
        Raises:
            WebSocketDisconnected: If connection closed
            ValueError: If JSON parsing fails
        """
    
    async def receive_msgpack(self) -> object:
        """
        Receive MessagePack message.
        
        Returns:
            Unpacked MessagePack data
            
        Raises:
            WebSocketDisconnected: If connection closed
        """
    
    # Message sending
    async def send_text(self, payload: str):
        """
        Send text message.
        
        Args:
            payload: Text message to send
            
        Raises:
            WebSocketDisconnected: If connection closed
        """
    
    async def send_data(self, payload: bytes):
        """
        Send binary message.
        
        Args:
            payload: Binary data to send
            
        Raises:
            WebSocketDisconnected: If connection closed
        """
    
    async def send_json(self, obj: object):
        """
        Send JSON message.
        
        Args:
            obj: Object to serialize as JSON
            
        Raises:
            WebSocketDisconnected: If connection closed
        """
    
    async def send_msgpack(self, obj: object):
        """
        Send MessagePack message.
        
        Args:
            obj: Object to serialize as MessagePack
            
        Raises:
            WebSocketDisconnected: If connection closed
        """

class WebSocketOptions:
    def __init__(
        self,
        media_handlers: object = None,
        max_receive_queue: int = 32
    ):
        """
        WebSocket configuration options.
        
        Args:
            media_handlers: Media handler registry
            max_receive_queue: Maximum receive queue size
        """

WebSocket Usage Examples

import falcon.asgi
import asyncio
import json

class ChatWebSocket:
    def __init__(self):
        self.connections = set()
    
    async def on_websocket(self, req, ws):
        await ws.accept()
        self.connections.add(ws)
        
        try:
            while True:
                message = await ws.receive_text()
                data = json.loads(message)
                
                # Broadcast to all connections
                await self.broadcast({
                    'user': data.get('user', 'Anonymous'),
                    'message': data.get('message', ''),
                    'timestamp': time.time()
                })
        except falcon.WebSocketDisconnected:
            pass
        finally:
            self.connections.discard(ws)
    
    async def broadcast(self, data):
        """Broadcast message to all connected clients"""
        if self.connections:
            message = json.dumps(data)
            await asyncio.gather(
                *[ws.send_text(message) for ws in self.connections],
                return_exceptions=True
            )

class EchoWebSocket:
    async def on_websocket(self, req, ws):
        await ws.accept()
        
        try:
            while True:
                # Handle different message types
                try:
                    # Try to receive as JSON first
                    data = await ws.receive_json()
                    await ws.send_json({
                        'echo': data,
                        'type': 'json'
                    })
                except ValueError:
                    # Fall back to text
                    message = await ws.receive_text()
                    await ws.send_text(f"Echo: {message}")
        except falcon.WebSocketDisconnected:
            print("Client disconnected")

# Register WebSocket routes
app = falcon.asgi.App()
app.add_route('/chat', ChatWebSocket())
app.add_route('/echo', EchoWebSocket())

Server-Sent Events

Server-Sent Events support for real-time data streaming.

class SSEvent:
    def __init__(
        self,
        data: str,
        event_type: str = None,
        event_id: str = None,
        retry: int = None
    ):
        """
        Server-Sent Event.
        
        Args:
            data: Event data
            event_type: Event type name
            event_id: Event ID for client tracking
            retry: Retry interval in milliseconds
        """
    
    def serialize(self) -> str:
        """
        Serialize event to SSE format.
        
        Returns:
            SSE-formatted string
        """
    
    # Properties
    data: str  # Event data
    event_type: str  # Event type
    event_id: str  # Event ID
    retry: int  # Retry interval

Server-Sent Events Example

import falcon.asgi
import asyncio
from falcon.asgi import SSEvent

class EventStreamResource:
    async def on_get(self, req, resp):
        # Set SSE headers
        resp.content_type = 'text/event-stream'
        resp.set_header('Cache-Control', 'no-cache')
        resp.set_header('Connection', 'keep-alive')
        
        # Create event stream
        async def event_stream():
            counter = 0
            while True:
                # Create event
                event = SSEvent(
                    data=f'{{"count": {counter}, "timestamp": "{time.time()}"}}',
                    event_type='counter',
                    event_id=str(counter)
                )
                
                yield event.serialize().encode('utf-8')
                counter += 1
                await asyncio.sleep(1)
        
        resp.stream = event_stream()

class NotificationStream:
    def __init__(self):
        self.subscribers = set()
    
    async def on_get(self, req, resp):
        resp.content_type = 'text/event-stream'
        resp.set_header('Cache-Control', 'no-cache')
        
        # Create subscriber queue
        queue = asyncio.Queue()
        self.subscribers.add(queue)
        
        try:
            async def stream():
                while True:
                    event_data = await queue.get()
                    event = SSEvent(
                        data=json.dumps(event_data),
                        event_type='notification'
                    )
                    yield event.serialize().encode('utf-8')
            
            resp.stream = stream()
        finally:
            self.subscribers.discard(queue)
    
    async def broadcast_notification(self, data):
        """Send notification to all subscribers"""
        for queue in self.subscribers:
            try:
                queue.put_nowait(data)
            except asyncio.QueueFull:
                pass  # Skip if queue is full

# Register SSE routes
app.add_route('/events', EventStreamResource())
app.add_route('/notifications', NotificationStream())

ASGI Middleware

Middleware support for ASGI applications with async processing.

# ASGI middleware class structure
class ASGIMiddleware:
    def __init__(self, other_args):
        """Initialize middleware with configuration"""
    
    async def process_request(self, req: Request, resp: Response):
        """
        Process request before routing.
        
        Args:
            req: Request object  
            resp: Response object
        """
    
    async def process_response(self, req: Request, resp: Response, resource: object, req_succeeded: bool):
        """
        Process response after routing.
        
        Args:
            req: Request object
            resp: Response object
            resource: Resource that handled request
            req_succeeded: Whether request succeeded
        """
    
    async def process_resource(self, req: Request, resp: Response, resource: object, params: dict):
        """
        Process resource before calling responder.
        
        Args:
            req: Request object
            resp: Response object
            resource: Target resource
            params: Route parameters
        """

ASGI Middleware Example

class AsyncLoggingMiddleware:
    async def process_request(self, req, resp):
        req.context.start_time = time.time()
        print(f"Request: {req.method} {req.path}")
    
    async def process_response(self, req, resp, resource, req_succeeded):
        duration = time.time() - req.context.start_time
        print(f"Response: {resp.status} ({duration:.3f}s)")

class AsyncAuthMiddleware:
    async def process_request(self, req, resp):
        token = req.get_header('Authorization')
        if not token:
            raise falcon.HTTPUnauthorized(title='Authentication required')
        
        # Async token validation
        user = await validate_token_async(token)
        if not user:
            raise falcon.HTTPUnauthorized(title='Invalid token')
        
        req.context.user = user

# Add middleware to ASGI app
app = falcon.asgi.App(middleware=[
    AsyncLoggingMiddleware(),
    AsyncAuthMiddleware()
])

Types

# ASGI application type
falcon.asgi.App: type  # ASGI application class

# WebSocket types
WebSocket: type  # WebSocket connection handler
WebSocketOptions: type  # WebSocket configuration

# Server-Sent Events types
SSEvent: type  # Server-sent event

# WebSocket payload type constants
class WebSocketPayloadType:
    TEXT: int  # Text payload type
    BINARY: int  # Binary payload type

# WebSocket exceptions
WebSocketDisconnected: type  # Connection disconnected
WebSocketPathNotFound: type  # WebSocket path not found
WebSocketHandlerNotFound: type  # Handler not found
WebSocketServerError: type  # Server error

Install with Tessl CLI

npx tessl i tessl/pypi-falcon

docs

application.md

asgi-websocket.md

error-handling.md

index.md

media.md

middleware-hooks.md

request-response.md

routing.md

testing.md

utilities.md

tile.json