CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-robyn

A Super Fast Async Python Web Framework with a Rust runtime.

Pending
Overview
Eval results
Files

websocket.mddocs/

WebSocket

Real-time communication support with WebSocket connections, event handling for connect/message/close events, broadcasting capabilities, and direct messaging between clients.

Capabilities

WebSocket Handler

Create WebSocket endpoints with event-driven handling and dependency injection.

class WebSocket:
    def __init__(
        self, 
        robyn_object: "Robyn", 
        endpoint: str, 
        config: Config = Config(),
        dependencies: DependencyMap = DependencyMap()
    ):
        """
        Create a WebSocket handler for an endpoint.
        
        Args:
            robyn_object: Robyn application instance
            endpoint: WebSocket endpoint URL pattern
            config: WebSocket configuration
            dependencies: Dependency injection container
        """
    
    def on(self, type: str):
        """
        Decorator for WebSocket event handlers.
        
        Args:
            type: Event type ("connect", "message", or "close")
            
        Usage:
            @websocket.on("connect")
            def handle_connect(websocket_connector):
                pass
        """
    
    def inject(self, **kwargs):
        """
        Inject dependencies into WebSocket handlers.
        
        Args:
            **kwargs: Dependencies to inject
        """
    
    def inject_global(self, **kwargs):
        """
        Inject global dependencies into WebSocket handlers.
        
        Args:
            **kwargs: Global dependencies to inject
        """

WebSocket Connector

The WebSocketConnector object represents an active WebSocket connection and provides methods for communication.

class WebSocketConnector:
    id: str
    query_params: QueryParams
    
    def async_broadcast(self, message: str):
        """
        Broadcast message to all connected clients (async).
        
        Args:
            message: Message to broadcast
            
        Note:
            Use in async event handlers
        """
    
    def async_send_to(self, sender_id: str, message: str):
        """
        Send message to specific client (async).
        
        Args:
            sender_id: Client ID to send message to
            message: Message to send
            
        Note:
            Use in async event handlers
        """
    
    def sync_broadcast(self, message: str):
        """
        Broadcast message to all connected clients (sync).
        
        Args:
            message: Message to broadcast
            
        Note:
            Use in sync event handlers
        """
    
    def sync_send_to(self, sender_id: str, message: str):
        """
        Send message to specific client (sync).
        
        Args:
            sender_id: Client ID to send message to
            message: Message to send
            
        Note:
            Use in sync event handlers
        """
    
    def close(self):
        """
        Close the WebSocket connection.
        """

Usage Examples

Basic WebSocket Echo Server

from robyn import Robyn, WebSocket

app = Robyn(__file__)

# Create WebSocket endpoint
websocket = WebSocket(app, "/ws")

@websocket.on("connect")
def on_connect(websocket_connector):
    print(f"Client {websocket_connector.id} connected")
    websocket_connector.sync_broadcast(f"User {websocket_connector.id} joined the chat")

@websocket.on("message")
def on_message(websocket_connector, message):
    print(f"Message from {websocket_connector.id}: {message}")
    # Echo message back to sender
    websocket_connector.sync_send_to(websocket_connector.id, f"Echo: {message}")

@websocket.on("close")
def on_close(websocket_connector):
    print(f"Client {websocket_connector.id} disconnected")
    websocket_connector.sync_broadcast(f"User {websocket_connector.id} left the chat")

# Regular HTTP routes
@app.get("/")
def index(request):
    return """
    <!DOCTYPE html>
    <html>
    <head><title>WebSocket Echo</title></head>
    <body>
        <div id="messages"></div>
        <input type="text" id="messageInput" placeholder="Type a message...">
        <button onclick="sendMessage()">Send</button>
        
        <script>
            const ws = new WebSocket('ws://localhost:8080/ws');
            const messages = document.getElementById('messages');
            
            ws.onmessage = function(event) {
                const div = document.createElement('div');
                div.textContent = event.data;
                messages.appendChild(div);
            };
            
            function sendMessage() {
                const input = document.getElementById('messageInput');
                ws.send(input.value);
                input.value = '';
            }
        </script>
    </body>
    </html>
    """

app.start(host="0.0.0.0", port=8080)

Chat Room with Broadcasting

from robyn import Robyn, WebSocket
import json
import time

app = Robyn(__file__)

# In-memory store for connected users
connected_users = {}

websocket = WebSocket(app, "/chat")

@websocket.on("connect")
def on_connect(websocket_connector):
    user_id = websocket_connector.id
    
    # Get username from query parameters
    username = websocket_connector.query_params.get("username", f"User_{user_id[:8]}")
    connected_users[user_id] = username
    
    # Welcome message to the user
    welcome_msg = json.dumps({
        "type": "welcome",
        "message": f"Welcome to the chat, {username}!",
        "user_id": user_id
    })
    websocket_connector.sync_send_to(user_id, welcome_msg)
    
    # Broadcast user joined message
    join_msg = json.dumps({
        "type": "user_joined",
        "username": username,
        "user_id": user_id,
        "timestamp": time.time()
    })
    websocket_connector.sync_broadcast(join_msg)
    
    # Send current user list
    user_list_msg = json.dumps({
        "type": "user_list",
        "users": list(connected_users.values())
    })
    websocket_connector.sync_broadcast(user_list_msg)

@websocket.on("message")
def on_message(websocket_connector, message):
    user_id = websocket_connector.id
    username = connected_users.get(user_id, "Unknown")
    
    try:
        data = json.loads(message)
        msg_type = data.get("type", "message")
        
        if msg_type == "message":
            # Broadcast chat message
            chat_msg = json.dumps({
                "type": "message",
                "username": username,
                "user_id": user_id,
                "message": data.get("message", ""),
                "timestamp": time.time()
            })
            websocket_connector.sync_broadcast(chat_msg)
            
        elif msg_type == "private_message":
            # Send private message to specific user
            target_user_id = data.get("target_user_id")
            if target_user_id:
                private_msg = json.dumps({
                    "type": "private_message",
                    "from_username": username,
                    "from_user_id": user_id,
                    "message": data.get("message", ""),
                    "timestamp": time.time()
                })
                websocket_connector.sync_send_to(target_user_id, private_msg)
                
    except json.JSONDecodeError:
        error_msg = json.dumps({
            "type": "error",
            "message": "Invalid message format"
        })
        websocket_connector.sync_send_to(user_id, error_msg)

@websocket.on("close")
def on_close(websocket_connector):
    user_id = websocket_connector.id
    username = connected_users.pop(user_id, "Unknown")
    
    # Broadcast user left message
    leave_msg = json.dumps({
        "type": "user_left",
        "username": username,
        "user_id": user_id,
        "timestamp": time.time()
    })
    websocket_connector.sync_broadcast(leave_msg)
    
    # Update user list
    user_list_msg = json.dumps({
        "type": "user_list",
        "users": list(connected_users.values())
    })
    websocket_connector.sync_broadcast(user_list_msg)

app.start()

Async WebSocket Handlers

from robyn import Robyn, WebSocket
import asyncio
import json

app = Robyn(__file__)

websocket = WebSocket(app, "/async_ws")

@websocket.on("connect")
async def on_connect(websocket_connector):
    print(f"Client {websocket_connector.id} connected")
    
    # Send periodic updates using async broadcast
    asyncio.create_task(send_periodic_updates(websocket_connector))

@websocket.on("message")
async def on_message(websocket_connector, message):
    print(f"Received: {message}")
    
    # Process message asynchronously
    processed_message = await process_message(message)
    
    # Send response back
    response = json.dumps({
        "type": "response",
        "original": message,
        "processed": processed_message
    })
    await websocket_connector.async_send_to(websocket_connector.id, response)

@websocket.on("close")
async def on_close(websocket_connector):
    print(f"Client {websocket_connector.id} disconnected")

async def process_message(message):
    # Simulate async processing
    await asyncio.sleep(0.1)
    return message.upper()

async def send_periodic_updates(websocket_connector):
    """Send periodic updates to the client"""
    count = 0
    while True:
        await asyncio.sleep(5)  # Wait 5 seconds
        update = json.dumps({
            "type": "periodic_update",
            "count": count,
            "timestamp": time.time()
        })
        try:
            await websocket_connector.async_send_to(websocket_connector.id, update)
            count += 1
        except:
            # Connection closed, stop sending updates
            break

app.start()

WebSocket with Dependency Injection

from robyn import Robyn, WebSocket, DependencyMap
import json

class MessageLogger:
    def __init__(self):
        self.messages = []
    
    def log_message(self, user_id, message):
        self.messages.append({
            "user_id": user_id,
            "message": message,
            "timestamp": time.time()
        })
    
    def get_recent_messages(self, limit=10):
        return self.messages[-limit:]

class UserManager:
    def __init__(self):
        self.users = {}
    
    def add_user(self, user_id, username):
        self.users[user_id] = username
    
    def remove_user(self, user_id):
        return self.users.pop(user_id, None)
    
    def get_all_users(self):
        return list(self.users.values())

app = Robyn(__file__)

# Create dependencies
message_logger = MessageLogger()
user_manager = UserManager()

# Create WebSocket with dependencies
dependencies = DependencyMap()
websocket = WebSocket(app, "/chat", dependencies=dependencies)

# Inject dependencies
websocket.inject_global(
    logger=message_logger,
    user_manager=user_manager
)

@websocket.on("connect")
def on_connect(websocket_connector, logger, user_manager):
    user_id = websocket_connector.id
    username = websocket_connector.query_params.get("username", f"User_{user_id[:8]}")
    
    user_manager.add_user(user_id, username)
    logger.log_message("system", f"{username} joined the chat")
    
    # Send recent messages to new user
    recent_messages = logger.get_recent_messages()
    history_msg = json.dumps({
        "type": "message_history",
        "messages": recent_messages
    })
    websocket_connector.sync_send_to(user_id, history_msg)

@websocket.on("message")
def on_message(websocket_connector, message, logger, user_manager):
    user_id = websocket_connector.id
    username = user_manager.users.get(user_id, "Unknown")
    
    # Log the message
    logger.log_message(user_id, message)
    
    # Broadcast to all users
    chat_msg = json.dumps({
        "type": "message",
        "username": username,
        "message": message,
        "timestamp": time.time()
    })
    websocket_connector.sync_broadcast(chat_msg)

@websocket.on("close")
def on_close(websocket_connector, logger, user_manager):
    user_id = websocket_connector.id
    username = user_manager.remove_user(user_id)
    
    if username:
        logger.log_message("system", f"{username} left the chat")

app.start()

WebSocket with Query Parameters

from robyn import Robyn, WebSocket

app = Robyn(__file__)

websocket = WebSocket(app, "/room/<room_id>")

# Store rooms and their connected users
rooms = {}

@websocket.on("connect")
def on_connect(websocket_connector):
    # Get room ID from the URL (would need to be implemented in the actual framework)
    room_id = websocket_connector.query_params.get("room_id", "general")
    user_id = websocket_connector.id
    
    # Initialize room if it doesn't exist
    if room_id not in rooms:
        rooms[room_id] = set()
    
    # Add user to room
    rooms[room_id].add(user_id)
    
    print(f"User {user_id} joined room {room_id}")
    
    # Notify room members
    room_msg = f"User {user_id} joined room {room_id}"
    websocket_connector.sync_broadcast(room_msg)

@websocket.on("message")  
def on_message(websocket_connector, message):
    room_id = websocket_connector.query_params.get("room_id", "general")
    user_id = websocket_connector.id
    
    # Broadcast message to room (in a real implementation, you'd need room-specific broadcasting)
    room_message = f"[{room_id}] {user_id}: {message}"
    websocket_connector.sync_broadcast(room_message)

@websocket.on("close")
def on_close(websocket_connector):
    room_id = websocket_connector.query_params.get("room_id", "general")
    user_id = websocket_connector.id
    
    # Remove user from room
    if room_id in rooms:
        rooms[room_id].discard(user_id)
        if not rooms[room_id]:  # Remove empty rooms
            del rooms[room_id]
    
    print(f"User {user_id} left room {room_id}")

app.start()

Install with Tessl CLI

npx tessl i tessl/pypi-robyn

docs

authentication.md

core-app.md

exceptions.md

index.md

mcp.md

openapi.md

request-response.md

status-codes.md

templating.md

websocket.md

tile.json