A Super Fast Async Python Web Framework with a Rust runtime.
—
Real-time communication support with WebSocket connections, event handling for connect/message/close events, broadcasting capabilities, and direct messaging between clients.
Create WebSocket endpoints with event-driven handling and dependency injection.
class WebSocket:
def __init__(
self,
robyn_object: "Robyn",
endpoint: str,
config: Config = Config(),
dependencies: DependencyMap = DependencyMap()
):
"""
Create a WebSocket handler for an endpoint.
Args:
robyn_object: Robyn application instance
endpoint: WebSocket endpoint URL pattern
config: WebSocket configuration
dependencies: Dependency injection container
"""
def on(self, type: str):
"""
Decorator for WebSocket event handlers.
Args:
type: Event type ("connect", "message", or "close")
Usage:
@websocket.on("connect")
def handle_connect(websocket_connector):
pass
"""
def inject(self, **kwargs):
"""
Inject dependencies into WebSocket handlers.
Args:
**kwargs: Dependencies to inject
"""
def inject_global(self, **kwargs):
"""
Inject global dependencies into WebSocket handlers.
Args:
**kwargs: Global dependencies to inject
"""The WebSocketConnector object represents an active WebSocket connection and provides methods for communication.
class WebSocketConnector:
id: str
query_params: QueryParams
def async_broadcast(self, message: str):
"""
Broadcast message to all connected clients (async).
Args:
message: Message to broadcast
Note:
Use in async event handlers
"""
def async_send_to(self, sender_id: str, message: str):
"""
Send message to specific client (async).
Args:
sender_id: Client ID to send message to
message: Message to send
Note:
Use in async event handlers
"""
def sync_broadcast(self, message: str):
"""
Broadcast message to all connected clients (sync).
Args:
message: Message to broadcast
Note:
Use in sync event handlers
"""
def sync_send_to(self, sender_id: str, message: str):
"""
Send message to specific client (sync).
Args:
sender_id: Client ID to send message to
message: Message to send
Note:
Use in sync event handlers
"""
def close(self):
"""
Close the WebSocket connection.
"""from robyn import Robyn, WebSocket
app = Robyn(__file__)
# Create WebSocket endpoint
websocket = WebSocket(app, "/ws")
@websocket.on("connect")
def on_connect(websocket_connector):
print(f"Client {websocket_connector.id} connected")
websocket_connector.sync_broadcast(f"User {websocket_connector.id} joined the chat")
@websocket.on("message")
def on_message(websocket_connector, message):
print(f"Message from {websocket_connector.id}: {message}")
# Echo message back to sender
websocket_connector.sync_send_to(websocket_connector.id, f"Echo: {message}")
@websocket.on("close")
def on_close(websocket_connector):
print(f"Client {websocket_connector.id} disconnected")
websocket_connector.sync_broadcast(f"User {websocket_connector.id} left the chat")
# Regular HTTP routes
@app.get("/")
def index(request):
return """
<!DOCTYPE html>
<html>
<head><title>WebSocket Echo</title></head>
<body>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="Type a message...">
<button onclick="sendMessage()">Send</button>
<script>
const ws = new WebSocket('ws://localhost:8080/ws');
const messages = document.getElementById('messages');
ws.onmessage = function(event) {
const div = document.createElement('div');
div.textContent = event.data;
messages.appendChild(div);
};
function sendMessage() {
const input = document.getElementById('messageInput');
ws.send(input.value);
input.value = '';
}
</script>
</body>
</html>
"""
app.start(host="0.0.0.0", port=8080)from robyn import Robyn, WebSocket
import json
import time
app = Robyn(__file__)
# In-memory store for connected users
connected_users = {}
websocket = WebSocket(app, "/chat")
@websocket.on("connect")
def on_connect(websocket_connector):
user_id = websocket_connector.id
# Get username from query parameters
username = websocket_connector.query_params.get("username", f"User_{user_id[:8]}")
connected_users[user_id] = username
# Welcome message to the user
welcome_msg = json.dumps({
"type": "welcome",
"message": f"Welcome to the chat, {username}!",
"user_id": user_id
})
websocket_connector.sync_send_to(user_id, welcome_msg)
# Broadcast user joined message
join_msg = json.dumps({
"type": "user_joined",
"username": username,
"user_id": user_id,
"timestamp": time.time()
})
websocket_connector.sync_broadcast(join_msg)
# Send current user list
user_list_msg = json.dumps({
"type": "user_list",
"users": list(connected_users.values())
})
websocket_connector.sync_broadcast(user_list_msg)
@websocket.on("message")
def on_message(websocket_connector, message):
user_id = websocket_connector.id
username = connected_users.get(user_id, "Unknown")
try:
data = json.loads(message)
msg_type = data.get("type", "message")
if msg_type == "message":
# Broadcast chat message
chat_msg = json.dumps({
"type": "message",
"username": username,
"user_id": user_id,
"message": data.get("message", ""),
"timestamp": time.time()
})
websocket_connector.sync_broadcast(chat_msg)
elif msg_type == "private_message":
# Send private message to specific user
target_user_id = data.get("target_user_id")
if target_user_id:
private_msg = json.dumps({
"type": "private_message",
"from_username": username,
"from_user_id": user_id,
"message": data.get("message", ""),
"timestamp": time.time()
})
websocket_connector.sync_send_to(target_user_id, private_msg)
except json.JSONDecodeError:
error_msg = json.dumps({
"type": "error",
"message": "Invalid message format"
})
websocket_connector.sync_send_to(user_id, error_msg)
@websocket.on("close")
def on_close(websocket_connector):
user_id = websocket_connector.id
username = connected_users.pop(user_id, "Unknown")
# Broadcast user left message
leave_msg = json.dumps({
"type": "user_left",
"username": username,
"user_id": user_id,
"timestamp": time.time()
})
websocket_connector.sync_broadcast(leave_msg)
# Update user list
user_list_msg = json.dumps({
"type": "user_list",
"users": list(connected_users.values())
})
websocket_connector.sync_broadcast(user_list_msg)
app.start()from robyn import Robyn, WebSocket
import asyncio
import json
app = Robyn(__file__)
websocket = WebSocket(app, "/async_ws")
@websocket.on("connect")
async def on_connect(websocket_connector):
print(f"Client {websocket_connector.id} connected")
# Send periodic updates using async broadcast
asyncio.create_task(send_periodic_updates(websocket_connector))
@websocket.on("message")
async def on_message(websocket_connector, message):
print(f"Received: {message}")
# Process message asynchronously
processed_message = await process_message(message)
# Send response back
response = json.dumps({
"type": "response",
"original": message,
"processed": processed_message
})
await websocket_connector.async_send_to(websocket_connector.id, response)
@websocket.on("close")
async def on_close(websocket_connector):
print(f"Client {websocket_connector.id} disconnected")
async def process_message(message):
# Simulate async processing
await asyncio.sleep(0.1)
return message.upper()
async def send_periodic_updates(websocket_connector):
"""Send periodic updates to the client"""
count = 0
while True:
await asyncio.sleep(5) # Wait 5 seconds
update = json.dumps({
"type": "periodic_update",
"count": count,
"timestamp": time.time()
})
try:
await websocket_connector.async_send_to(websocket_connector.id, update)
count += 1
except:
# Connection closed, stop sending updates
break
app.start()from robyn import Robyn, WebSocket, DependencyMap
import json
class MessageLogger:
def __init__(self):
self.messages = []
def log_message(self, user_id, message):
self.messages.append({
"user_id": user_id,
"message": message,
"timestamp": time.time()
})
def get_recent_messages(self, limit=10):
return self.messages[-limit:]
class UserManager:
def __init__(self):
self.users = {}
def add_user(self, user_id, username):
self.users[user_id] = username
def remove_user(self, user_id):
return self.users.pop(user_id, None)
def get_all_users(self):
return list(self.users.values())
app = Robyn(__file__)
# Create dependencies
message_logger = MessageLogger()
user_manager = UserManager()
# Create WebSocket with dependencies
dependencies = DependencyMap()
websocket = WebSocket(app, "/chat", dependencies=dependencies)
# Inject dependencies
websocket.inject_global(
logger=message_logger,
user_manager=user_manager
)
@websocket.on("connect")
def on_connect(websocket_connector, logger, user_manager):
user_id = websocket_connector.id
username = websocket_connector.query_params.get("username", f"User_{user_id[:8]}")
user_manager.add_user(user_id, username)
logger.log_message("system", f"{username} joined the chat")
# Send recent messages to new user
recent_messages = logger.get_recent_messages()
history_msg = json.dumps({
"type": "message_history",
"messages": recent_messages
})
websocket_connector.sync_send_to(user_id, history_msg)
@websocket.on("message")
def on_message(websocket_connector, message, logger, user_manager):
user_id = websocket_connector.id
username = user_manager.users.get(user_id, "Unknown")
# Log the message
logger.log_message(user_id, message)
# Broadcast to all users
chat_msg = json.dumps({
"type": "message",
"username": username,
"message": message,
"timestamp": time.time()
})
websocket_connector.sync_broadcast(chat_msg)
@websocket.on("close")
def on_close(websocket_connector, logger, user_manager):
user_id = websocket_connector.id
username = user_manager.remove_user(user_id)
if username:
logger.log_message("system", f"{username} left the chat")
app.start()from robyn import Robyn, WebSocket
app = Robyn(__file__)
websocket = WebSocket(app, "/room/<room_id>")
# Store rooms and their connected users
rooms = {}
@websocket.on("connect")
def on_connect(websocket_connector):
# Get room ID from the URL (would need to be implemented in the actual framework)
room_id = websocket_connector.query_params.get("room_id", "general")
user_id = websocket_connector.id
# Initialize room if it doesn't exist
if room_id not in rooms:
rooms[room_id] = set()
# Add user to room
rooms[room_id].add(user_id)
print(f"User {user_id} joined room {room_id}")
# Notify room members
room_msg = f"User {user_id} joined room {room_id}"
websocket_connector.sync_broadcast(room_msg)
@websocket.on("message")
def on_message(websocket_connector, message):
room_id = websocket_connector.query_params.get("room_id", "general")
user_id = websocket_connector.id
# Broadcast message to room (in a real implementation, you'd need room-specific broadcasting)
room_message = f"[{room_id}] {user_id}: {message}"
websocket_connector.sync_broadcast(room_message)
@websocket.on("close")
def on_close(websocket_connector):
room_id = websocket_connector.query_params.get("room_id", "general")
user_id = websocket_connector.id
# Remove user from room
if room_id in rooms:
rooms[room_id].discard(user_id)
if not rooms[room_id]: # Remove empty rooms
del rooms[room_id]
print(f"User {user_id} left room {room_id}")
app.start()Install with Tessl CLI
npx tessl i tessl/pypi-robyn