0
# WebSocket
1
2
Real-time communication support with WebSocket connections, event handling for connect/message/close events, broadcasting capabilities, and direct messaging between clients.
3
4
## Capabilities
5
6
### WebSocket Handler
7
8
Create WebSocket endpoints with event-driven handling and dependency injection.
9
10
```python { .api }
11
class WebSocket:
12
def __init__(
13
self,
14
robyn_object: "Robyn",
15
endpoint: str,
16
config: Config = Config(),
17
dependencies: DependencyMap = DependencyMap()
18
):
19
"""
20
Create a WebSocket handler for an endpoint.
21
22
Args:
23
robyn_object: Robyn application instance
24
endpoint: WebSocket endpoint URL pattern
25
config: WebSocket configuration
26
dependencies: Dependency injection container
27
"""
28
29
def on(self, type: str):
30
"""
31
Decorator for WebSocket event handlers.
32
33
Args:
34
type: Event type ("connect", "message", or "close")
35
36
Usage:
37
@websocket.on("connect")
38
def handle_connect(websocket_connector):
39
pass
40
"""
41
42
def inject(self, **kwargs):
43
"""
44
Inject dependencies into WebSocket handlers.
45
46
Args:
47
**kwargs: Dependencies to inject
48
"""
49
50
def inject_global(self, **kwargs):
51
"""
52
Inject global dependencies into WebSocket handlers.
53
54
Args:
55
**kwargs: Global dependencies to inject
56
"""
57
```
58
59
### WebSocket Connector
60
61
The WebSocketConnector object represents an active WebSocket connection and provides methods for communication.
62
63
```python { .api }
64
class WebSocketConnector:
65
id: str
66
query_params: QueryParams
67
68
def async_broadcast(self, message: str):
69
"""
70
Broadcast message to all connected clients (async).
71
72
Args:
73
message: Message to broadcast
74
75
Note:
76
Use in async event handlers
77
"""
78
79
def async_send_to(self, sender_id: str, message: str):
80
"""
81
Send message to specific client (async).
82
83
Args:
84
sender_id: Client ID to send message to
85
message: Message to send
86
87
Note:
88
Use in async event handlers
89
"""
90
91
def sync_broadcast(self, message: str):
92
"""
93
Broadcast message to all connected clients (sync).
94
95
Args:
96
message: Message to broadcast
97
98
Note:
99
Use in sync event handlers
100
"""
101
102
def sync_send_to(self, sender_id: str, message: str):
103
"""
104
Send message to specific client (sync).
105
106
Args:
107
sender_id: Client ID to send message to
108
message: Message to send
109
110
Note:
111
Use in sync event handlers
112
"""
113
114
def close(self):
115
"""
116
Close the WebSocket connection.
117
"""
118
```
119
120
## Usage Examples
121
122
### Basic WebSocket Echo Server
123
124
```python
125
from robyn import Robyn, WebSocket
126
127
app = Robyn(__file__)
128
129
# Create WebSocket endpoint
130
websocket = WebSocket(app, "/ws")
131
132
@websocket.on("connect")
133
def on_connect(websocket_connector):
134
print(f"Client {websocket_connector.id} connected")
135
websocket_connector.sync_broadcast(f"User {websocket_connector.id} joined the chat")
136
137
@websocket.on("message")
138
def on_message(websocket_connector, message):
139
print(f"Message from {websocket_connector.id}: {message}")
140
# Echo message back to sender
141
websocket_connector.sync_send_to(websocket_connector.id, f"Echo: {message}")
142
143
@websocket.on("close")
144
def on_close(websocket_connector):
145
print(f"Client {websocket_connector.id} disconnected")
146
websocket_connector.sync_broadcast(f"User {websocket_connector.id} left the chat")
147
148
# Regular HTTP routes
149
@app.get("/")
150
def index(request):
151
return """
152
<!DOCTYPE html>
153
<html>
154
<head><title>WebSocket Echo</title></head>
155
<body>
156
<div id="messages"></div>
157
<input type="text" id="messageInput" placeholder="Type a message...">
158
<button onclick="sendMessage()">Send</button>
159
160
<script>
161
const ws = new WebSocket('ws://localhost:8080/ws');
162
const messages = document.getElementById('messages');
163
164
ws.onmessage = function(event) {
165
const div = document.createElement('div');
166
div.textContent = event.data;
167
messages.appendChild(div);
168
};
169
170
function sendMessage() {
171
const input = document.getElementById('messageInput');
172
ws.send(input.value);
173
input.value = '';
174
}
175
</script>
176
</body>
177
</html>
178
"""
179
180
app.start(host="0.0.0.0", port=8080)
181
```
182
183
### Chat Room with Broadcasting
184
185
```python
186
from robyn import Robyn, WebSocket
187
import json
188
import time
189
190
app = Robyn(__file__)
191
192
# In-memory store for connected users
193
connected_users = {}
194
195
websocket = WebSocket(app, "/chat")
196
197
@websocket.on("connect")
198
def on_connect(websocket_connector):
199
user_id = websocket_connector.id
200
201
# Get username from query parameters
202
username = websocket_connector.query_params.get("username", f"User_{user_id[:8]}")
203
connected_users[user_id] = username
204
205
# Welcome message to the user
206
welcome_msg = json.dumps({
207
"type": "welcome",
208
"message": f"Welcome to the chat, {username}!",
209
"user_id": user_id
210
})
211
websocket_connector.sync_send_to(user_id, welcome_msg)
212
213
# Broadcast user joined message
214
join_msg = json.dumps({
215
"type": "user_joined",
216
"username": username,
217
"user_id": user_id,
218
"timestamp": time.time()
219
})
220
websocket_connector.sync_broadcast(join_msg)
221
222
# Send current user list
223
user_list_msg = json.dumps({
224
"type": "user_list",
225
"users": list(connected_users.values())
226
})
227
websocket_connector.sync_broadcast(user_list_msg)
228
229
@websocket.on("message")
230
def on_message(websocket_connector, message):
231
user_id = websocket_connector.id
232
username = connected_users.get(user_id, "Unknown")
233
234
try:
235
data = json.loads(message)
236
msg_type = data.get("type", "message")
237
238
if msg_type == "message":
239
# Broadcast chat message
240
chat_msg = json.dumps({
241
"type": "message",
242
"username": username,
243
"user_id": user_id,
244
"message": data.get("message", ""),
245
"timestamp": time.time()
246
})
247
websocket_connector.sync_broadcast(chat_msg)
248
249
elif msg_type == "private_message":
250
# Send private message to specific user
251
target_user_id = data.get("target_user_id")
252
if target_user_id:
253
private_msg = json.dumps({
254
"type": "private_message",
255
"from_username": username,
256
"from_user_id": user_id,
257
"message": data.get("message", ""),
258
"timestamp": time.time()
259
})
260
websocket_connector.sync_send_to(target_user_id, private_msg)
261
262
except json.JSONDecodeError:
263
error_msg = json.dumps({
264
"type": "error",
265
"message": "Invalid message format"
266
})
267
websocket_connector.sync_send_to(user_id, error_msg)
268
269
@websocket.on("close")
270
def on_close(websocket_connector):
271
user_id = websocket_connector.id
272
username = connected_users.pop(user_id, "Unknown")
273
274
# Broadcast user left message
275
leave_msg = json.dumps({
276
"type": "user_left",
277
"username": username,
278
"user_id": user_id,
279
"timestamp": time.time()
280
})
281
websocket_connector.sync_broadcast(leave_msg)
282
283
# Update user list
284
user_list_msg = json.dumps({
285
"type": "user_list",
286
"users": list(connected_users.values())
287
})
288
websocket_connector.sync_broadcast(user_list_msg)
289
290
app.start()
291
```
292
293
### Async WebSocket Handlers
294
295
```python
296
from robyn import Robyn, WebSocket
297
import asyncio
298
import json
299
300
app = Robyn(__file__)
301
302
websocket = WebSocket(app, "/async_ws")
303
304
@websocket.on("connect")
305
async def on_connect(websocket_connector):
306
print(f"Client {websocket_connector.id} connected")
307
308
# Send periodic updates using async broadcast
309
asyncio.create_task(send_periodic_updates(websocket_connector))
310
311
@websocket.on("message")
312
async def on_message(websocket_connector, message):
313
print(f"Received: {message}")
314
315
# Process message asynchronously
316
processed_message = await process_message(message)
317
318
# Send response back
319
response = json.dumps({
320
"type": "response",
321
"original": message,
322
"processed": processed_message
323
})
324
await websocket_connector.async_send_to(websocket_connector.id, response)
325
326
@websocket.on("close")
327
async def on_close(websocket_connector):
328
print(f"Client {websocket_connector.id} disconnected")
329
330
async def process_message(message):
331
# Simulate async processing
332
await asyncio.sleep(0.1)
333
return message.upper()
334
335
async def send_periodic_updates(websocket_connector):
336
"""Send periodic updates to the client"""
337
count = 0
338
while True:
339
await asyncio.sleep(5) # Wait 5 seconds
340
update = json.dumps({
341
"type": "periodic_update",
342
"count": count,
343
"timestamp": time.time()
344
})
345
try:
346
await websocket_connector.async_send_to(websocket_connector.id, update)
347
count += 1
348
except:
349
# Connection closed, stop sending updates
350
break
351
352
app.start()
353
```
354
355
### WebSocket with Dependency Injection
356
357
```python
358
from robyn import Robyn, WebSocket, DependencyMap
359
import json
360
361
class MessageLogger:
362
def __init__(self):
363
self.messages = []
364
365
def log_message(self, user_id, message):
366
self.messages.append({
367
"user_id": user_id,
368
"message": message,
369
"timestamp": time.time()
370
})
371
372
def get_recent_messages(self, limit=10):
373
return self.messages[-limit:]
374
375
class UserManager:
376
def __init__(self):
377
self.users = {}
378
379
def add_user(self, user_id, username):
380
self.users[user_id] = username
381
382
def remove_user(self, user_id):
383
return self.users.pop(user_id, None)
384
385
def get_all_users(self):
386
return list(self.users.values())
387
388
app = Robyn(__file__)
389
390
# Create dependencies
391
message_logger = MessageLogger()
392
user_manager = UserManager()
393
394
# Create WebSocket with dependencies
395
dependencies = DependencyMap()
396
websocket = WebSocket(app, "/chat", dependencies=dependencies)
397
398
# Inject dependencies
399
websocket.inject_global(
400
logger=message_logger,
401
user_manager=user_manager
402
)
403
404
@websocket.on("connect")
405
def on_connect(websocket_connector, logger, user_manager):
406
user_id = websocket_connector.id
407
username = websocket_connector.query_params.get("username", f"User_{user_id[:8]}")
408
409
user_manager.add_user(user_id, username)
410
logger.log_message("system", f"{username} joined the chat")
411
412
# Send recent messages to new user
413
recent_messages = logger.get_recent_messages()
414
history_msg = json.dumps({
415
"type": "message_history",
416
"messages": recent_messages
417
})
418
websocket_connector.sync_send_to(user_id, history_msg)
419
420
@websocket.on("message")
421
def on_message(websocket_connector, message, logger, user_manager):
422
user_id = websocket_connector.id
423
username = user_manager.users.get(user_id, "Unknown")
424
425
# Log the message
426
logger.log_message(user_id, message)
427
428
# Broadcast to all users
429
chat_msg = json.dumps({
430
"type": "message",
431
"username": username,
432
"message": message,
433
"timestamp": time.time()
434
})
435
websocket_connector.sync_broadcast(chat_msg)
436
437
@websocket.on("close")
438
def on_close(websocket_connector, logger, user_manager):
439
user_id = websocket_connector.id
440
username = user_manager.remove_user(user_id)
441
442
if username:
443
logger.log_message("system", f"{username} left the chat")
444
445
app.start()
446
```
447
448
### WebSocket with Query Parameters
449
450
```python
451
from robyn import Robyn, WebSocket
452
453
app = Robyn(__file__)
454
455
websocket = WebSocket(app, "/room/<room_id>")
456
457
# Store rooms and their connected users
458
rooms = {}
459
460
@websocket.on("connect")
461
def on_connect(websocket_connector):
462
# Get room ID from the URL (would need to be implemented in the actual framework)
463
room_id = websocket_connector.query_params.get("room_id", "general")
464
user_id = websocket_connector.id
465
466
# Initialize room if it doesn't exist
467
if room_id not in rooms:
468
rooms[room_id] = set()
469
470
# Add user to room
471
rooms[room_id].add(user_id)
472
473
print(f"User {user_id} joined room {room_id}")
474
475
# Notify room members
476
room_msg = f"User {user_id} joined room {room_id}"
477
websocket_connector.sync_broadcast(room_msg)
478
479
@websocket.on("message")
480
def on_message(websocket_connector, message):
481
room_id = websocket_connector.query_params.get("room_id", "general")
482
user_id = websocket_connector.id
483
484
# Broadcast message to room (in a real implementation, you'd need room-specific broadcasting)
485
room_message = f"[{room_id}] {user_id}: {message}"
486
websocket_connector.sync_broadcast(room_message)
487
488
@websocket.on("close")
489
def on_close(websocket_connector):
490
room_id = websocket_connector.query_params.get("room_id", "general")
491
user_id = websocket_connector.id
492
493
# Remove user from room
494
if room_id in rooms:
495
rooms[room_id].discard(user_id)
496
if not rooms[room_id]: # Remove empty rooms
497
del rooms[room_id]
498
499
print(f"User {user_id} left room {room_id}")
500
501
app.start()
502
```