CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-socketio

Socket.IO server and client for Python providing real-time bidirectional communication

Pending
Overview
Eval results
Files

managers.mddocs/

Manager Classes

Client connection managers that handle message routing, room membership, and horizontal scaling through various pub/sub messaging systems. Managers enable Socket.IO applications to scale across multiple processes and servers while maintaining consistent real-time communication.

Capabilities

Manager

Basic client manager for single-process Socket.IO servers. Handles client connections, message routing, and room management within a single server instance.

class Manager:
    """
    Basic client manager for single-process servers.
    
    Inherits from: BaseManager
    
    Attributes:
        logger: Logger instance for manager events
    """
    
    def __init__(self, logger=False):
        """
        Initialize the manager.
        
        Args:
            logger (bool or Logger): Enable logging or provide custom logger
        """
    
    def can_disconnect(self, sid, namespace):
        """
        Check if a client can be disconnected.
        
        Args:
            sid (str): Client session ID
            namespace (str): Target namespace
            
        Returns:
            bool: True if client can be disconnected
        """
    
    def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs):
        """
        Emit an event to connected clients.
        
        Args:
            event (str): Event name
            data: Event data to send
            namespace (str): Target namespace
            room (str or list): Target room name(s)
            skip_sid (str): Skip this client session ID
            callback (callable): Callback function for response
            **kwargs: Additional parameters (to, etc.)
        """
    
    def disconnect(self, sid, namespace=None):
        """
        Disconnect a client.
        
        Args:
            sid (str): Client session ID
            namespace (str): Target namespace
        """
    
    def enter_room(self, sid, namespace, room, eio_sid=None):
        """
        Add a client to a room.
        
        Args:
            sid (str): Client session ID
            namespace (str): Target namespace
            room (str): Room name
            eio_sid (str): Engine.IO session ID
        """
    
    def leave_room(self, sid, namespace, room):
        """
        Remove a client from a room.
        
        Args:
            sid (str): Client session ID
            namespace (str): Target namespace
            room (str): Room name
        """
    
    def close_room(self, room, namespace):
        """
        Remove all clients from a room and delete the room.
        
        Args:
            room (str): Room name
            namespace (str): Target namespace
        """
    
    def get_participants(self, namespace, room):
        """
        Get the list of clients in a room.
        
        Args:
            namespace (str): Target namespace
            room (str): Room name
            
        Returns:
            list: Client session IDs in the room
        """
    
    def trigger_callback(self, sid, id, data):
        """
        Trigger a callback function for a client.
        
        Args:
            sid (str): Client session ID
            id (int): Callback ID
            data: Callback data
        """

Usage Example

import socketio

# Create server with basic manager
manager = socketio.Manager(logger=True)
sio = socketio.Server(client_manager=manager)

@sio.event
def connect(sid, environ):
    print(f'Client {sid} connected')

@sio.event
def join_room(sid, data):
    room = data['room']
    sio.enter_room(sid, room)
    # Manager handles the room membership internally

app = socketio.WSGIApp(sio)

AsyncManager

Asynchronous version of the basic Manager for asyncio-based Socket.IO servers.

class AsyncManager:
    """
    Asyncio client manager.
    
    Inherits from: BaseManager
    
    Attributes:
        logger: Logger instance for manager events
    """
    
    def __init__(self, logger=False):
        """
        Initialize the async manager.
        
        Args:
            logger (bool or Logger): Enable logging or provide custom logger
        """
    
    async def can_disconnect(self, sid, namespace):
        """
        Check if a client can be disconnected.
        
        Args:
            sid (str): Client session ID
            namespace (str): Target namespace
            
        Returns:
            bool: True if client can be disconnected
        """
    
    async def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs):
        """
        Emit an event to connected clients.
        
        Args:
            event (str): Event name
            data: Event data to send
            namespace (str): Target namespace
            room (str or list): Target room name(s)
            skip_sid (str): Skip this client session ID
            callback (coroutine): Async callback function for response
            **kwargs: Additional parameters
        """
    
    async def disconnect(self, sid, namespace=None):
        """
        Disconnect a client.
        
        Args:
            sid (str): Client session ID
            namespace (str): Target namespace
        """
    
    async def enter_room(self, sid, namespace, room, eio_sid=None):
        """
        Add a client to a room.
        
        Args:
            sid (str): Client session ID
            namespace (str): Target namespace
            room (str): Room name
            eio_sid (str): Engine.IO session ID
        """
    
    async def leave_room(self, sid, namespace, room):
        """
        Remove a client from a room.
        
        Args:
            sid (str): Client session ID
            namespace (str): Target namespace
            room (str): Room name
        """
    
    async def close_room(self, room, namespace):
        """
        Remove all clients from a room and delete the room.
        
        Args:
            room (str): Room name
            namespace (str): Target namespace
        """
    
    async def get_participants(self, namespace, room):
        """
        Get the list of clients in a room.
        
        Args:
            namespace (str): Target namespace
            room (str): Room name
            
        Returns:
            list: Client session IDs in the room
        """
    
    async def trigger_callback(self, sid, id, data):
        """
        Trigger a callback function for a client.
        
        Args:
            sid (str): Client session ID
            id (int): Callback ID
            data: Callback data
        """

PubSubManager

Base class for pub/sub-based client managers that enable horizontal scaling across multiple server processes through external message brokers.

class PubSubManager(Manager):
    """
    Base class for pub/sub-based client managers.
    
    Inherits from: Manager
    
    Attributes:
        channel (str): Message channel name
        write_only (bool): Write-only mode (no message listening)
        logger: Logger instance
    """
    
    def __init__(self, channel='socketio', write_only=False, logger=None):
        """
        Initialize the pub/sub manager.
        
        Args:
            channel (str): Message channel name (default: 'socketio')
            write_only (bool): Write-only mode - don't listen for messages (default: False)
            logger (Logger): Logger instance
        """
    
    def _publish(self, data):
        """
        Publish a message to the message broker.
        
        Args:
            data: Message data to publish
            
        Note:
            This is an abstract method that must be implemented by subclasses.
        """
        raise NotImplementedError
    
    def _listen(self):
        """
        Listen for messages from the message broker.
        
        Note:
            This is an abstract method that must be implemented by subclasses.
        """
        raise NotImplementedError

RedisManager

Redis-based client manager that uses Redis pub/sub for horizontal scaling across multiple server instances.

class RedisManager(PubSubManager):
    """
    Redis-based client manager (synchronous).
    
    Inherits from: PubSubManager
    
    Attributes:
        redis_url (str): Redis connection URL
        channel (str): Redis channel name
        redis: Redis client instance
    """
    
    def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, 
                 logger=None, redis_options=None):
        """
        Initialize the Redis manager.
        
        Args:
            url (str): Redis connection URL (default: 'redis://localhost:6379/0')
            channel (str): Redis channel name (default: 'socketio')
            write_only (bool): Write-only mode (default: False)
            logger (Logger): Logger instance
            redis_options (dict): Additional Redis client options
        """
    
    def _publish(self, data):
        """
        Publish a message to Redis.
        
        Args:
            data: Message data to publish
        """
    
    def _listen(self):
        """
        Listen for messages from Redis pub/sub.
        """

Usage Example

import socketio

# Create Redis manager for scaling
redis_manager = socketio.RedisManager(
    url='redis://localhost:6379/0',
    channel='my-app-socketio',
    logger=True
)

# Create server with Redis manager
sio = socketio.Server(client_manager=redis_manager)

@sio.event
def connect(sid, environ):
    print(f'Client {sid} connected')
    sio.enter_room(sid, 'global')

@sio.event
def broadcast_message(sid, data):
    # This message will be broadcast across all server instances
    sio.emit('message', data, room='global')

# Multiple server instances can use the same Redis manager
# and will share room memberships and message routing
app = socketio.WSGIApp(sio)

AsyncRedisManager

Asynchronous Redis-based client manager for asyncio applications.

class AsyncRedisManager(AsyncPubSubManager):
    """
    Async Redis-based client manager.
    
    Inherits from: AsyncPubSubManager
    
    Attributes:
        redis_url (str): Redis connection URL
        channel (str): Redis channel name
        redis: Async Redis client instance
    """
    
    def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, 
                 logger=None, redis_options=None):
        """
        Initialize the async Redis manager.
        
        Args:
            url (str): Redis connection URL (default: 'redis://localhost:6379/0')
            channel (str): Redis channel name (default: 'socketio')
            write_only (bool): Write-only mode (default: False)
            logger (Logger): Logger instance
            redis_options (dict): Additional Redis client options
        """
    
    async def _publish(self, data):
        """
        Publish a message to Redis.
        
        Args:
            data: Message data to publish
        """
    
    async def _listen(self):
        """
        Listen for messages from Redis pub/sub.
        """

Usage Example

import socketio

# Create async Redis manager
redis_manager = socketio.AsyncRedisManager(
    url='redis://redis-cluster:6379/0',
    channel='async-app-socketio',
    logger=True,
    redis_options={'socket_timeout': 5}
)

# Create async server with Redis manager
sio = socketio.AsyncServer(client_manager=redis_manager)

@sio.event
async def connect(sid, environ):
    print(f'Client {sid} connected')
    await sio.enter_room(sid, 'async-global')

@sio.event
async def broadcast_message(sid, data):
    # Broadcast across all async server instances
    await sio.emit('message', data, room='async-global')

app = socketio.ASGIApp(sio)

KafkaManager

Apache Kafka-based client manager for high-throughput, distributed message routing.

class KafkaManager(PubSubManager):
    """
    Apache Kafka-based client manager.
    
    Inherits from: PubSubManager
    
    Attributes:
        kafka_url (str): Kafka broker URL
        topic (str): Kafka topic name
    """
    
    def __init__(self, url='kafka://localhost:9092', channel='socketio', write_only=False, logger=None):
        """
        Initialize the Kafka manager.
        
        Args:
            url (str): Kafka broker URL (default: 'kafka://localhost:9092')
            channel (str): Kafka topic name (default: 'socketio')
            write_only (bool): Write-only mode (default: False)
            logger (Logger): Logger instance
        """
    
    def _publish(self, data):
        """
        Publish a message to Kafka topic.
        
        Args:
            data: Message data to publish
        """
    
    def _listen(self):
        """
        Listen for messages from Kafka topic.
        """

Usage Example

import socketio

# Create Kafka manager for high-throughput scaling
kafka_manager = socketio.KafkaManager(
    url='kafka://kafka-broker1:9092,kafka-broker2:9092',
    channel='realtime-events',
    logger=True
)

sio = socketio.Server(client_manager=kafka_manager)

@sio.event
def connect(sid, environ):
    sio.enter_room(sid, 'high-volume')

@sio.event
def process_event(sid, data):
    # High-throughput event processing across Kafka cluster
    sio.emit('event_processed', data, room='high-volume')

app = socketio.WSGIApp(sio)

KombuManager

Kombu-based client manager supporting multiple message brokers including RabbitMQ, Redis, and others through the Kombu library.

class KombuManager(PubSubManager):
    """
    Kombu-based client manager for various message brokers.
    
    Inherits from: PubSubManager
    
    Attributes:
        url (str): Message broker URL
        channel (str): Message channel/queue name
    """
    
    def __init__(self, url=None, channel='socketio', write_only=False, logger=None):
        """
        Initialize the Kombu manager.
        
        Args:
            url (str): Message broker URL (format depends on broker type)
            channel (str): Message channel/queue name (default: 'socketio')
            write_only (bool): Write-only mode (default: False)
            logger (Logger): Logger instance
            
        Supported URLs:
            - RabbitMQ: 'amqp://user:pass@host:port/vhost'
            - Redis: 'redis://localhost:6379/0'
            - SQS: 'sqs://access_key:secret_key@region'
        """
    
    def _publish(self, data):
        """
        Publish a message via Kombu.
        
        Args:
            data: Message data to publish
        """
    
    def _listen(self):
        """
        Listen for messages via Kombu.
        """

Usage Example

import socketio

# RabbitMQ with Kombu
rabbitmq_manager = socketio.KombuManager(
    url='amqp://user:password@rabbitmq-server:5672/myvhost',
    channel='socketio-events',
    logger=True
)

sio = socketio.Server(client_manager=rabbitmq_manager)

@sio.event
def connect(sid, environ):
    sio.enter_room(sid, 'rabbitmq-room')

app = socketio.WSGIApp(sio)

ZmqManager

ZeroMQ-based client manager for experimental message routing (requires eventlet and external ZMQ message broker).

class ZmqManager(PubSubManager):
    """
    ZeroMQ-based client manager (experimental).
    
    Inherits from: PubSubManager
    
    Attributes:
        zmq_url (str): ZeroMQ connection URL
        channel (str): Channel name
    """
    
    def __init__(self, url='zmq+tcp://localhost:5555', channel='socketio', write_only=False, logger=None):
        """
        Initialize the ZeroMQ manager.
        
        Args:
            url (str): ZeroMQ connection URL (default: 'zmq+tcp://localhost:5555')
            channel (str): Channel name (default: 'socketio')
            write_only (bool): Write-only mode (default: False)
            logger (Logger): Logger instance
            
        Requirements:
            - eventlet must be used as the async mode
            - External ZMQ message broker is required
        """
    
    def _publish(self, data):
        """
        Publish a message via ZeroMQ.
        
        Args:
            data: Message data to publish
        """
    
    def _listen(self):
        """
        Listen for messages via ZeroMQ.
        """

AsyncAioPikaManager

Asynchronous RabbitMQ client manager using the aio_pika library for asyncio applications.

class AsyncAioPikaManager(AsyncPubSubManager):
    """
    Async RabbitMQ client manager using aio_pika.
    
    Inherits from: AsyncPubSubManager
    
    Attributes:
        rabbitmq_url (str): RabbitMQ connection URL
        channel (str): Exchange/queue name
    """
    
    def __init__(self, url='amqp://guest:guest@localhost:5672//', channel='socketio', 
                 write_only=False, logger=None):
        """
        Initialize the async RabbitMQ manager.
        
        Args:
            url (str): RabbitMQ connection URL (default: 'amqp://guest:guest@localhost:5672//')
            channel (str): Exchange/queue name (default: 'socketio')
            write_only (bool): Write-only mode (default: False)
            logger (Logger): Logger instance
        """
    
    async def _publish(self, data):
        """
        Publish a message to RabbitMQ.
        
        Args:
            data: Message data to publish
        """
    
    async def _listen(self):
        """
        Listen for messages from RabbitMQ.
        """

Usage Example

import socketio

# Create async RabbitMQ manager
rabbitmq_manager = socketio.AsyncAioPikaManager(
    url='amqp://user:password@rabbitmq-cluster:5672/production',
    channel='async-socketio-events',
    logger=True
)

sio = socketio.AsyncServer(client_manager=rabbitmq_manager)

@sio.event
async def connect(sid, environ):
    await sio.enter_room(sid, 'async-rabbitmq-room')

@sio.event
async def process_async_event(sid, data):
    # Process with async RabbitMQ scaling
    await sio.emit('async_event_processed', data, room='async-rabbitmq-room')

app = socketio.ASGIApp(sio)

Manager Selection Guidelines

Choose the appropriate manager based on your scaling and infrastructure requirements:

  • Manager/AsyncManager: Single-process applications, development, simple deployments
  • RedisManager/AsyncRedisManager: Multi-process scaling, moderate traffic, simple setup
  • KafkaManager: High-throughput applications, event streaming, complex distributed systems
  • KombuManager: Flexible broker support, existing RabbitMQ/AMQP infrastructure
  • AsyncAioPikaManager: Async RabbitMQ with advanced features, asyncio applications
  • ZmqManager: Experimental, specialized use cases requiring ZeroMQ

Write-Only Mode

All pub/sub managers support write-only mode for scenarios where you only need to send messages but not receive them from other server instances:

# Write-only Redis manager (doesn't listen for messages)
write_only_manager = socketio.RedisManager(
    url='redis://localhost:6379/0',
    write_only=True
)

Install with Tessl CLI

npx tessl i tessl/pypi-python-socketio

docs

clients.md

exceptions.md

index.md

integration.md

managers.md

namespaces.md

servers.md

tile.json