Socket.IO server and client for Python providing real-time bidirectional communication
—
Client connection managers that handle message routing, room membership, and horizontal scaling through various pub/sub messaging systems. Managers enable Socket.IO applications to scale across multiple processes and servers while maintaining consistent real-time communication.
Basic client manager for single-process Socket.IO servers. Handles client connections, message routing, and room management within a single server instance.
class Manager:
"""
Basic client manager for single-process servers.
Inherits from: BaseManager
Attributes:
logger: Logger instance for manager events
"""
def __init__(self, logger=False):
"""
Initialize the manager.
Args:
logger (bool or Logger): Enable logging or provide custom logger
"""
def can_disconnect(self, sid, namespace):
"""
Check if a client can be disconnected.
Args:
sid (str): Client session ID
namespace (str): Target namespace
Returns:
bool: True if client can be disconnected
"""
def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs):
"""
Emit an event to connected clients.
Args:
event (str): Event name
data: Event data to send
namespace (str): Target namespace
room (str or list): Target room name(s)
skip_sid (str): Skip this client session ID
callback (callable): Callback function for response
**kwargs: Additional parameters (to, etc.)
"""
def disconnect(self, sid, namespace=None):
"""
Disconnect a client.
Args:
sid (str): Client session ID
namespace (str): Target namespace
"""
def enter_room(self, sid, namespace, room, eio_sid=None):
"""
Add a client to a room.
Args:
sid (str): Client session ID
namespace (str): Target namespace
room (str): Room name
eio_sid (str): Engine.IO session ID
"""
def leave_room(self, sid, namespace, room):
"""
Remove a client from a room.
Args:
sid (str): Client session ID
namespace (str): Target namespace
room (str): Room name
"""
def close_room(self, room, namespace):
"""
Remove all clients from a room and delete the room.
Args:
room (str): Room name
namespace (str): Target namespace
"""
def get_participants(self, namespace, room):
"""
Get the list of clients in a room.
Args:
namespace (str): Target namespace
room (str): Room name
Returns:
list: Client session IDs in the room
"""
def trigger_callback(self, sid, id, data):
"""
Trigger a callback function for a client.
Args:
sid (str): Client session ID
id (int): Callback ID
data: Callback data
"""import socketio
# Create server with basic manager
manager = socketio.Manager(logger=True)
sio = socketio.Server(client_manager=manager)
@sio.event
def connect(sid, environ):
print(f'Client {sid} connected')
@sio.event
def join_room(sid, data):
room = data['room']
sio.enter_room(sid, room)
# Manager handles the room membership internally
app = socketio.WSGIApp(sio)Asynchronous version of the basic Manager for asyncio-based Socket.IO servers.
class AsyncManager:
"""
Asyncio client manager.
Inherits from: BaseManager
Attributes:
logger: Logger instance for manager events
"""
def __init__(self, logger=False):
"""
Initialize the async manager.
Args:
logger (bool or Logger): Enable logging or provide custom logger
"""
async def can_disconnect(self, sid, namespace):
"""
Check if a client can be disconnected.
Args:
sid (str): Client session ID
namespace (str): Target namespace
Returns:
bool: True if client can be disconnected
"""
async def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs):
"""
Emit an event to connected clients.
Args:
event (str): Event name
data: Event data to send
namespace (str): Target namespace
room (str or list): Target room name(s)
skip_sid (str): Skip this client session ID
callback (coroutine): Async callback function for response
**kwargs: Additional parameters
"""
async def disconnect(self, sid, namespace=None):
"""
Disconnect a client.
Args:
sid (str): Client session ID
namespace (str): Target namespace
"""
async def enter_room(self, sid, namespace, room, eio_sid=None):
"""
Add a client to a room.
Args:
sid (str): Client session ID
namespace (str): Target namespace
room (str): Room name
eio_sid (str): Engine.IO session ID
"""
async def leave_room(self, sid, namespace, room):
"""
Remove a client from a room.
Args:
sid (str): Client session ID
namespace (str): Target namespace
room (str): Room name
"""
async def close_room(self, room, namespace):
"""
Remove all clients from a room and delete the room.
Args:
room (str): Room name
namespace (str): Target namespace
"""
async def get_participants(self, namespace, room):
"""
Get the list of clients in a room.
Args:
namespace (str): Target namespace
room (str): Room name
Returns:
list: Client session IDs in the room
"""
async def trigger_callback(self, sid, id, data):
"""
Trigger a callback function for a client.
Args:
sid (str): Client session ID
id (int): Callback ID
data: Callback data
"""Base class for pub/sub-based client managers that enable horizontal scaling across multiple server processes through external message brokers.
class PubSubManager(Manager):
"""
Base class for pub/sub-based client managers.
Inherits from: Manager
Attributes:
channel (str): Message channel name
write_only (bool): Write-only mode (no message listening)
logger: Logger instance
"""
def __init__(self, channel='socketio', write_only=False, logger=None):
"""
Initialize the pub/sub manager.
Args:
channel (str): Message channel name (default: 'socketio')
write_only (bool): Write-only mode - don't listen for messages (default: False)
logger (Logger): Logger instance
"""
def _publish(self, data):
"""
Publish a message to the message broker.
Args:
data: Message data to publish
Note:
This is an abstract method that must be implemented by subclasses.
"""
raise NotImplementedError
def _listen(self):
"""
Listen for messages from the message broker.
Note:
This is an abstract method that must be implemented by subclasses.
"""
raise NotImplementedErrorRedis-based client manager that uses Redis pub/sub for horizontal scaling across multiple server instances.
class RedisManager(PubSubManager):
"""
Redis-based client manager (synchronous).
Inherits from: PubSubManager
Attributes:
redis_url (str): Redis connection URL
channel (str): Redis channel name
redis: Redis client instance
"""
def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False,
logger=None, redis_options=None):
"""
Initialize the Redis manager.
Args:
url (str): Redis connection URL (default: 'redis://localhost:6379/0')
channel (str): Redis channel name (default: 'socketio')
write_only (bool): Write-only mode (default: False)
logger (Logger): Logger instance
redis_options (dict): Additional Redis client options
"""
def _publish(self, data):
"""
Publish a message to Redis.
Args:
data: Message data to publish
"""
def _listen(self):
"""
Listen for messages from Redis pub/sub.
"""import socketio
# Create Redis manager for scaling
redis_manager = socketio.RedisManager(
url='redis://localhost:6379/0',
channel='my-app-socketio',
logger=True
)
# Create server with Redis manager
sio = socketio.Server(client_manager=redis_manager)
@sio.event
def connect(sid, environ):
print(f'Client {sid} connected')
sio.enter_room(sid, 'global')
@sio.event
def broadcast_message(sid, data):
# This message will be broadcast across all server instances
sio.emit('message', data, room='global')
# Multiple server instances can use the same Redis manager
# and will share room memberships and message routing
app = socketio.WSGIApp(sio)Asynchronous Redis-based client manager for asyncio applications.
class AsyncRedisManager(AsyncPubSubManager):
"""
Async Redis-based client manager.
Inherits from: AsyncPubSubManager
Attributes:
redis_url (str): Redis connection URL
channel (str): Redis channel name
redis: Async Redis client instance
"""
def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False,
logger=None, redis_options=None):
"""
Initialize the async Redis manager.
Args:
url (str): Redis connection URL (default: 'redis://localhost:6379/0')
channel (str): Redis channel name (default: 'socketio')
write_only (bool): Write-only mode (default: False)
logger (Logger): Logger instance
redis_options (dict): Additional Redis client options
"""
async def _publish(self, data):
"""
Publish a message to Redis.
Args:
data: Message data to publish
"""
async def _listen(self):
"""
Listen for messages from Redis pub/sub.
"""import socketio
# Create async Redis manager
redis_manager = socketio.AsyncRedisManager(
url='redis://redis-cluster:6379/0',
channel='async-app-socketio',
logger=True,
redis_options={'socket_timeout': 5}
)
# Create async server with Redis manager
sio = socketio.AsyncServer(client_manager=redis_manager)
@sio.event
async def connect(sid, environ):
print(f'Client {sid} connected')
await sio.enter_room(sid, 'async-global')
@sio.event
async def broadcast_message(sid, data):
# Broadcast across all async server instances
await sio.emit('message', data, room='async-global')
app = socketio.ASGIApp(sio)Apache Kafka-based client manager for high-throughput, distributed message routing.
class KafkaManager(PubSubManager):
"""
Apache Kafka-based client manager.
Inherits from: PubSubManager
Attributes:
kafka_url (str): Kafka broker URL
topic (str): Kafka topic name
"""
def __init__(self, url='kafka://localhost:9092', channel='socketio', write_only=False, logger=None):
"""
Initialize the Kafka manager.
Args:
url (str): Kafka broker URL (default: 'kafka://localhost:9092')
channel (str): Kafka topic name (default: 'socketio')
write_only (bool): Write-only mode (default: False)
logger (Logger): Logger instance
"""
def _publish(self, data):
"""
Publish a message to Kafka topic.
Args:
data: Message data to publish
"""
def _listen(self):
"""
Listen for messages from Kafka topic.
"""import socketio
# Create Kafka manager for high-throughput scaling
kafka_manager = socketio.KafkaManager(
url='kafka://kafka-broker1:9092,kafka-broker2:9092',
channel='realtime-events',
logger=True
)
sio = socketio.Server(client_manager=kafka_manager)
@sio.event
def connect(sid, environ):
sio.enter_room(sid, 'high-volume')
@sio.event
def process_event(sid, data):
# High-throughput event processing across Kafka cluster
sio.emit('event_processed', data, room='high-volume')
app = socketio.WSGIApp(sio)Kombu-based client manager supporting multiple message brokers including RabbitMQ, Redis, and others through the Kombu library.
class KombuManager(PubSubManager):
"""
Kombu-based client manager for various message brokers.
Inherits from: PubSubManager
Attributes:
url (str): Message broker URL
channel (str): Message channel/queue name
"""
def __init__(self, url=None, channel='socketio', write_only=False, logger=None):
"""
Initialize the Kombu manager.
Args:
url (str): Message broker URL (format depends on broker type)
channel (str): Message channel/queue name (default: 'socketio')
write_only (bool): Write-only mode (default: False)
logger (Logger): Logger instance
Supported URLs:
- RabbitMQ: 'amqp://user:pass@host:port/vhost'
- Redis: 'redis://localhost:6379/0'
- SQS: 'sqs://access_key:secret_key@region'
"""
def _publish(self, data):
"""
Publish a message via Kombu.
Args:
data: Message data to publish
"""
def _listen(self):
"""
Listen for messages via Kombu.
"""import socketio
# RabbitMQ with Kombu
rabbitmq_manager = socketio.KombuManager(
url='amqp://user:password@rabbitmq-server:5672/myvhost',
channel='socketio-events',
logger=True
)
sio = socketio.Server(client_manager=rabbitmq_manager)
@sio.event
def connect(sid, environ):
sio.enter_room(sid, 'rabbitmq-room')
app = socketio.WSGIApp(sio)ZeroMQ-based client manager for experimental message routing (requires eventlet and external ZMQ message broker).
class ZmqManager(PubSubManager):
"""
ZeroMQ-based client manager (experimental).
Inherits from: PubSubManager
Attributes:
zmq_url (str): ZeroMQ connection URL
channel (str): Channel name
"""
def __init__(self, url='zmq+tcp://localhost:5555', channel='socketio', write_only=False, logger=None):
"""
Initialize the ZeroMQ manager.
Args:
url (str): ZeroMQ connection URL (default: 'zmq+tcp://localhost:5555')
channel (str): Channel name (default: 'socketio')
write_only (bool): Write-only mode (default: False)
logger (Logger): Logger instance
Requirements:
- eventlet must be used as the async mode
- External ZMQ message broker is required
"""
def _publish(self, data):
"""
Publish a message via ZeroMQ.
Args:
data: Message data to publish
"""
def _listen(self):
"""
Listen for messages via ZeroMQ.
"""Asynchronous RabbitMQ client manager using the aio_pika library for asyncio applications.
class AsyncAioPikaManager(AsyncPubSubManager):
"""
Async RabbitMQ client manager using aio_pika.
Inherits from: AsyncPubSubManager
Attributes:
rabbitmq_url (str): RabbitMQ connection URL
channel (str): Exchange/queue name
"""
def __init__(self, url='amqp://guest:guest@localhost:5672//', channel='socketio',
write_only=False, logger=None):
"""
Initialize the async RabbitMQ manager.
Args:
url (str): RabbitMQ connection URL (default: 'amqp://guest:guest@localhost:5672//')
channel (str): Exchange/queue name (default: 'socketio')
write_only (bool): Write-only mode (default: False)
logger (Logger): Logger instance
"""
async def _publish(self, data):
"""
Publish a message to RabbitMQ.
Args:
data: Message data to publish
"""
async def _listen(self):
"""
Listen for messages from RabbitMQ.
"""import socketio
# Create async RabbitMQ manager
rabbitmq_manager = socketio.AsyncAioPikaManager(
url='amqp://user:password@rabbitmq-cluster:5672/production',
channel='async-socketio-events',
logger=True
)
sio = socketio.AsyncServer(client_manager=rabbitmq_manager)
@sio.event
async def connect(sid, environ):
await sio.enter_room(sid, 'async-rabbitmq-room')
@sio.event
async def process_async_event(sid, data):
# Process with async RabbitMQ scaling
await sio.emit('async_event_processed', data, room='async-rabbitmq-room')
app = socketio.ASGIApp(sio)Choose the appropriate manager based on your scaling and infrastructure requirements:
All pub/sub managers support write-only mode for scenarios where you only need to send messages but not receive them from other server instances:
# Write-only Redis manager (doesn't listen for messages)
write_only_manager = socketio.RedisManager(
url='redis://localhost:6379/0',
write_only=True
)Install with Tessl CLI
npx tessl i tessl/pypi-python-socketio