0
# Manager Classes
1
2
Client connection managers that handle message routing, room membership, and horizontal scaling through various pub/sub messaging systems. Managers enable Socket.IO applications to scale across multiple processes and servers while maintaining consistent real-time communication.
3
4
## Capabilities
5
6
### Manager
7
8
Basic client manager for single-process Socket.IO servers. Handles client connections, message routing, and room management within a single server instance.
9
10
```python { .api }
11
class Manager:
12
"""
13
Basic client manager for single-process servers.
14
15
Inherits from: BaseManager
16
17
Attributes:
18
logger: Logger instance for manager events
19
"""
20
21
def __init__(self, logger=False):
22
"""
23
Initialize the manager.
24
25
Args:
26
logger (bool or Logger): Enable logging or provide custom logger
27
"""
28
29
def can_disconnect(self, sid, namespace):
30
"""
31
Check if a client can be disconnected.
32
33
Args:
34
sid (str): Client session ID
35
namespace (str): Target namespace
36
37
Returns:
38
bool: True if client can be disconnected
39
"""
40
41
def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs):
42
"""
43
Emit an event to connected clients.
44
45
Args:
46
event (str): Event name
47
data: Event data to send
48
namespace (str): Target namespace
49
room (str or list): Target room name(s)
50
skip_sid (str): Skip this client session ID
51
callback (callable): Callback function for response
52
**kwargs: Additional parameters (to, etc.)
53
"""
54
55
def disconnect(self, sid, namespace=None):
56
"""
57
Disconnect a client.
58
59
Args:
60
sid (str): Client session ID
61
namespace (str): Target namespace
62
"""
63
64
def enter_room(self, sid, namespace, room, eio_sid=None):
65
"""
66
Add a client to a room.
67
68
Args:
69
sid (str): Client session ID
70
namespace (str): Target namespace
71
room (str): Room name
72
eio_sid (str): Engine.IO session ID
73
"""
74
75
def leave_room(self, sid, namespace, room):
76
"""
77
Remove a client from a room.
78
79
Args:
80
sid (str): Client session ID
81
namespace (str): Target namespace
82
room (str): Room name
83
"""
84
85
def close_room(self, room, namespace):
86
"""
87
Remove all clients from a room and delete the room.
88
89
Args:
90
room (str): Room name
91
namespace (str): Target namespace
92
"""
93
94
def get_participants(self, namespace, room):
95
"""
96
Get the list of clients in a room.
97
98
Args:
99
namespace (str): Target namespace
100
room (str): Room name
101
102
Returns:
103
list: Client session IDs in the room
104
"""
105
106
def trigger_callback(self, sid, id, data):
107
"""
108
Trigger a callback function for a client.
109
110
Args:
111
sid (str): Client session ID
112
id (int): Callback ID
113
data: Callback data
114
"""
115
```
116
117
#### Usage Example
118
119
```python
120
import socketio
121
122
# Create server with basic manager
123
manager = socketio.Manager(logger=True)
124
sio = socketio.Server(client_manager=manager)
125
126
@sio.event
127
def connect(sid, environ):
128
print(f'Client {sid} connected')
129
130
@sio.event
131
def join_room(sid, data):
132
room = data['room']
133
sio.enter_room(sid, room)
134
# Manager handles the room membership internally
135
136
app = socketio.WSGIApp(sio)
137
```
138
139
### AsyncManager
140
141
Asynchronous version of the basic Manager for asyncio-based Socket.IO servers.
142
143
```python { .api }
144
class AsyncManager:
145
"""
146
Asyncio client manager.
147
148
Inherits from: BaseManager
149
150
Attributes:
151
logger: Logger instance for manager events
152
"""
153
154
def __init__(self, logger=False):
155
"""
156
Initialize the async manager.
157
158
Args:
159
logger (bool or Logger): Enable logging or provide custom logger
160
"""
161
162
async def can_disconnect(self, sid, namespace):
163
"""
164
Check if a client can be disconnected.
165
166
Args:
167
sid (str): Client session ID
168
namespace (str): Target namespace
169
170
Returns:
171
bool: True if client can be disconnected
172
"""
173
174
async def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs):
175
"""
176
Emit an event to connected clients.
177
178
Args:
179
event (str): Event name
180
data: Event data to send
181
namespace (str): Target namespace
182
room (str or list): Target room name(s)
183
skip_sid (str): Skip this client session ID
184
callback (coroutine): Async callback function for response
185
**kwargs: Additional parameters
186
"""
187
188
async def disconnect(self, sid, namespace=None):
189
"""
190
Disconnect a client.
191
192
Args:
193
sid (str): Client session ID
194
namespace (str): Target namespace
195
"""
196
197
async def enter_room(self, sid, namespace, room, eio_sid=None):
198
"""
199
Add a client to a room.
200
201
Args:
202
sid (str): Client session ID
203
namespace (str): Target namespace
204
room (str): Room name
205
eio_sid (str): Engine.IO session ID
206
"""
207
208
async def leave_room(self, sid, namespace, room):
209
"""
210
Remove a client from a room.
211
212
Args:
213
sid (str): Client session ID
214
namespace (str): Target namespace
215
room (str): Room name
216
"""
217
218
async def close_room(self, room, namespace):
219
"""
220
Remove all clients from a room and delete the room.
221
222
Args:
223
room (str): Room name
224
namespace (str): Target namespace
225
"""
226
227
async def get_participants(self, namespace, room):
228
"""
229
Get the list of clients in a room.
230
231
Args:
232
namespace (str): Target namespace
233
room (str): Room name
234
235
Returns:
236
list: Client session IDs in the room
237
"""
238
239
async def trigger_callback(self, sid, id, data):
240
"""
241
Trigger a callback function for a client.
242
243
Args:
244
sid (str): Client session ID
245
id (int): Callback ID
246
data: Callback data
247
"""
248
```
249
250
### PubSubManager
251
252
Base class for pub/sub-based client managers that enable horizontal scaling across multiple server processes through external message brokers.
253
254
```python { .api }
255
class PubSubManager(Manager):
256
"""
257
Base class for pub/sub-based client managers.
258
259
Inherits from: Manager
260
261
Attributes:
262
channel (str): Message channel name
263
write_only (bool): Write-only mode (no message listening)
264
logger: Logger instance
265
"""
266
267
def __init__(self, channel='socketio', write_only=False, logger=None):
268
"""
269
Initialize the pub/sub manager.
270
271
Args:
272
channel (str): Message channel name (default: 'socketio')
273
write_only (bool): Write-only mode - don't listen for messages (default: False)
274
logger (Logger): Logger instance
275
"""
276
277
def _publish(self, data):
278
"""
279
Publish a message to the message broker.
280
281
Args:
282
data: Message data to publish
283
284
Note:
285
This is an abstract method that must be implemented by subclasses.
286
"""
287
raise NotImplementedError
288
289
def _listen(self):
290
"""
291
Listen for messages from the message broker.
292
293
Note:
294
This is an abstract method that must be implemented by subclasses.
295
"""
296
raise NotImplementedError
297
```
298
299
### RedisManager
300
301
Redis-based client manager that uses Redis pub/sub for horizontal scaling across multiple server instances.
302
303
```python { .api }
304
class RedisManager(PubSubManager):
305
"""
306
Redis-based client manager (synchronous).
307
308
Inherits from: PubSubManager
309
310
Attributes:
311
redis_url (str): Redis connection URL
312
channel (str): Redis channel name
313
redis: Redis client instance
314
"""
315
316
def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False,
317
logger=None, redis_options=None):
318
"""
319
Initialize the Redis manager.
320
321
Args:
322
url (str): Redis connection URL (default: 'redis://localhost:6379/0')
323
channel (str): Redis channel name (default: 'socketio')
324
write_only (bool): Write-only mode (default: False)
325
logger (Logger): Logger instance
326
redis_options (dict): Additional Redis client options
327
"""
328
329
def _publish(self, data):
330
"""
331
Publish a message to Redis.
332
333
Args:
334
data: Message data to publish
335
"""
336
337
def _listen(self):
338
"""
339
Listen for messages from Redis pub/sub.
340
"""
341
```
342
343
#### Usage Example
344
345
```python
346
import socketio
347
348
# Create Redis manager for scaling
349
redis_manager = socketio.RedisManager(
350
url='redis://localhost:6379/0',
351
channel='my-app-socketio',
352
logger=True
353
)
354
355
# Create server with Redis manager
356
sio = socketio.Server(client_manager=redis_manager)
357
358
@sio.event
359
def connect(sid, environ):
360
print(f'Client {sid} connected')
361
sio.enter_room(sid, 'global')
362
363
@sio.event
364
def broadcast_message(sid, data):
365
# This message will be broadcast across all server instances
366
sio.emit('message', data, room='global')
367
368
# Multiple server instances can use the same Redis manager
369
# and will share room memberships and message routing
370
app = socketio.WSGIApp(sio)
371
```
372
373
### AsyncRedisManager
374
375
Asynchronous Redis-based client manager for asyncio applications.
376
377
```python { .api }
378
class AsyncRedisManager(AsyncPubSubManager):
379
"""
380
Async Redis-based client manager.
381
382
Inherits from: AsyncPubSubManager
383
384
Attributes:
385
redis_url (str): Redis connection URL
386
channel (str): Redis channel name
387
redis: Async Redis client instance
388
"""
389
390
def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False,
391
logger=None, redis_options=None):
392
"""
393
Initialize the async Redis manager.
394
395
Args:
396
url (str): Redis connection URL (default: 'redis://localhost:6379/0')
397
channel (str): Redis channel name (default: 'socketio')
398
write_only (bool): Write-only mode (default: False)
399
logger (Logger): Logger instance
400
redis_options (dict): Additional Redis client options
401
"""
402
403
async def _publish(self, data):
404
"""
405
Publish a message to Redis.
406
407
Args:
408
data: Message data to publish
409
"""
410
411
async def _listen(self):
412
"""
413
Listen for messages from Redis pub/sub.
414
"""
415
```
416
417
#### Usage Example
418
419
```python
420
import socketio
421
422
# Create async Redis manager
423
redis_manager = socketio.AsyncRedisManager(
424
url='redis://redis-cluster:6379/0',
425
channel='async-app-socketio',
426
logger=True,
427
redis_options={'socket_timeout': 5}
428
)
429
430
# Create async server with Redis manager
431
sio = socketio.AsyncServer(client_manager=redis_manager)
432
433
@sio.event
434
async def connect(sid, environ):
435
print(f'Client {sid} connected')
436
await sio.enter_room(sid, 'async-global')
437
438
@sio.event
439
async def broadcast_message(sid, data):
440
# Broadcast across all async server instances
441
await sio.emit('message', data, room='async-global')
442
443
app = socketio.ASGIApp(sio)
444
```
445
446
### KafkaManager
447
448
Apache Kafka-based client manager for high-throughput, distributed message routing.
449
450
```python { .api }
451
class KafkaManager(PubSubManager):
452
"""
453
Apache Kafka-based client manager.
454
455
Inherits from: PubSubManager
456
457
Attributes:
458
kafka_url (str): Kafka broker URL
459
topic (str): Kafka topic name
460
"""
461
462
def __init__(self, url='kafka://localhost:9092', channel='socketio', write_only=False, logger=None):
463
"""
464
Initialize the Kafka manager.
465
466
Args:
467
url (str): Kafka broker URL (default: 'kafka://localhost:9092')
468
channel (str): Kafka topic name (default: 'socketio')
469
write_only (bool): Write-only mode (default: False)
470
logger (Logger): Logger instance
471
"""
472
473
def _publish(self, data):
474
"""
475
Publish a message to Kafka topic.
476
477
Args:
478
data: Message data to publish
479
"""
480
481
def _listen(self):
482
"""
483
Listen for messages from Kafka topic.
484
"""
485
```
486
487
#### Usage Example
488
489
```python
490
import socketio
491
492
# Create Kafka manager for high-throughput scaling
493
kafka_manager = socketio.KafkaManager(
494
url='kafka://kafka-broker1:9092,kafka-broker2:9092',
495
channel='realtime-events',
496
logger=True
497
)
498
499
sio = socketio.Server(client_manager=kafka_manager)
500
501
@sio.event
502
def connect(sid, environ):
503
sio.enter_room(sid, 'high-volume')
504
505
@sio.event
506
def process_event(sid, data):
507
# High-throughput event processing across Kafka cluster
508
sio.emit('event_processed', data, room='high-volume')
509
510
app = socketio.WSGIApp(sio)
511
```
512
513
### KombuManager
514
515
Kombu-based client manager supporting multiple message brokers including RabbitMQ, Redis, and others through the Kombu library.
516
517
```python { .api }
518
class KombuManager(PubSubManager):
519
"""
520
Kombu-based client manager for various message brokers.
521
522
Inherits from: PubSubManager
523
524
Attributes:
525
url (str): Message broker URL
526
channel (str): Message channel/queue name
527
"""
528
529
def __init__(self, url=None, channel='socketio', write_only=False, logger=None):
530
"""
531
Initialize the Kombu manager.
532
533
Args:
534
url (str): Message broker URL (format depends on broker type)
535
channel (str): Message channel/queue name (default: 'socketio')
536
write_only (bool): Write-only mode (default: False)
537
logger (Logger): Logger instance
538
539
Supported URLs:
540
- RabbitMQ: 'amqp://user:pass@host:port/vhost'
541
- Redis: 'redis://localhost:6379/0'
542
- SQS: 'sqs://access_key:secret_key@region'
543
"""
544
545
def _publish(self, data):
546
"""
547
Publish a message via Kombu.
548
549
Args:
550
data: Message data to publish
551
"""
552
553
def _listen(self):
554
"""
555
Listen for messages via Kombu.
556
"""
557
```
558
559
#### Usage Example
560
561
```python
562
import socketio
563
564
# RabbitMQ with Kombu
565
rabbitmq_manager = socketio.KombuManager(
566
url='amqp://user:password@rabbitmq-server:5672/myvhost',
567
channel='socketio-events',
568
logger=True
569
)
570
571
sio = socketio.Server(client_manager=rabbitmq_manager)
572
573
@sio.event
574
def connect(sid, environ):
575
sio.enter_room(sid, 'rabbitmq-room')
576
577
app = socketio.WSGIApp(sio)
578
```
579
580
### ZmqManager
581
582
ZeroMQ-based client manager for experimental message routing (requires eventlet and external ZMQ message broker).
583
584
```python { .api }
585
class ZmqManager(PubSubManager):
586
"""
587
ZeroMQ-based client manager (experimental).
588
589
Inherits from: PubSubManager
590
591
Attributes:
592
zmq_url (str): ZeroMQ connection URL
593
channel (str): Channel name
594
"""
595
596
def __init__(self, url='zmq+tcp://localhost:5555', channel='socketio', write_only=False, logger=None):
597
"""
598
Initialize the ZeroMQ manager.
599
600
Args:
601
url (str): ZeroMQ connection URL (default: 'zmq+tcp://localhost:5555')
602
channel (str): Channel name (default: 'socketio')
603
write_only (bool): Write-only mode (default: False)
604
logger (Logger): Logger instance
605
606
Requirements:
607
- eventlet must be used as the async mode
608
- External ZMQ message broker is required
609
"""
610
611
def _publish(self, data):
612
"""
613
Publish a message via ZeroMQ.
614
615
Args:
616
data: Message data to publish
617
"""
618
619
def _listen(self):
620
"""
621
Listen for messages via ZeroMQ.
622
"""
623
```
624
625
### AsyncAioPikaManager
626
627
Asynchronous RabbitMQ client manager using the aio_pika library for asyncio applications.
628
629
```python { .api }
630
class AsyncAioPikaManager(AsyncPubSubManager):
631
"""
632
Async RabbitMQ client manager using aio_pika.
633
634
Inherits from: AsyncPubSubManager
635
636
Attributes:
637
rabbitmq_url (str): RabbitMQ connection URL
638
channel (str): Exchange/queue name
639
"""
640
641
def __init__(self, url='amqp://guest:guest@localhost:5672//', channel='socketio',
642
write_only=False, logger=None):
643
"""
644
Initialize the async RabbitMQ manager.
645
646
Args:
647
url (str): RabbitMQ connection URL (default: 'amqp://guest:guest@localhost:5672//')
648
channel (str): Exchange/queue name (default: 'socketio')
649
write_only (bool): Write-only mode (default: False)
650
logger (Logger): Logger instance
651
"""
652
653
async def _publish(self, data):
654
"""
655
Publish a message to RabbitMQ.
656
657
Args:
658
data: Message data to publish
659
"""
660
661
async def _listen(self):
662
"""
663
Listen for messages from RabbitMQ.
664
"""
665
```
666
667
#### Usage Example
668
669
```python
670
import socketio
671
672
# Create async RabbitMQ manager
673
rabbitmq_manager = socketio.AsyncAioPikaManager(
674
url='amqp://user:password@rabbitmq-cluster:5672/production',
675
channel='async-socketio-events',
676
logger=True
677
)
678
679
sio = socketio.AsyncServer(client_manager=rabbitmq_manager)
680
681
@sio.event
682
async def connect(sid, environ):
683
await sio.enter_room(sid, 'async-rabbitmq-room')
684
685
@sio.event
686
async def process_async_event(sid, data):
687
# Process with async RabbitMQ scaling
688
await sio.emit('async_event_processed', data, room='async-rabbitmq-room')
689
690
app = socketio.ASGIApp(sio)
691
```
692
693
## Manager Selection Guidelines
694
695
Choose the appropriate manager based on your scaling and infrastructure requirements:
696
697
- **Manager/AsyncManager**: Single-process applications, development, simple deployments
698
- **RedisManager/AsyncRedisManager**: Multi-process scaling, moderate traffic, simple setup
699
- **KafkaManager**: High-throughput applications, event streaming, complex distributed systems
700
- **KombuManager**: Flexible broker support, existing RabbitMQ/AMQP infrastructure
701
- **AsyncAioPikaManager**: Async RabbitMQ with advanced features, asyncio applications
702
- **ZmqManager**: Experimental, specialized use cases requiring ZeroMQ
703
704
## Write-Only Mode
705
706
All pub/sub managers support write-only mode for scenarios where you only need to send messages but not receive them from other server instances:
707
708
```python
709
# Write-only Redis manager (doesn't listen for messages)
710
write_only_manager = socketio.RedisManager(
711
url='redis://localhost:6379/0',
712
write_only=True
713
)
714
```