Engine.IO server and client for Python providing real-time bidirectional communication
—
Engine.IO server optimized for asyncio-based applications with full async/await support. Provides the same functionality as the synchronous server but with coroutine-based methods for seamless integration with modern async web frameworks.
Create and configure an asynchronous Engine.IO server with the same configuration options as the synchronous server.
class AsyncServer:
def __init__(
self,
async_mode=None,
ping_interval=25,
ping_timeout=20,
max_http_buffer_size=1000000,
allow_upgrades=True,
http_compression=True,
compression_threshold=1024,
cookie=None,
cors_allowed_origins=None,
cors_credentials=True,
logger=False,
json=None,
async_handlers=True,
monitor_clients=None,
transports=None,
**kwargs
):
"""
Initialize asynchronous Engine.IO server.
Args:
async_mode (str, optional): Async mode for asyncio ('aiohttp', 'sanic', 'tornado', 'asgi')
ping_interval (int|tuple): Ping interval in seconds, default 25
ping_timeout (int): Ping timeout in seconds, default 20
max_http_buffer_size (int): Max message size in bytes, default 1,000,000
allow_upgrades (bool): Allow transport upgrades, default True
http_compression (bool): Enable HTTP compression, default True
compression_threshold (int): Compression threshold in bytes, default 1024
cookie (str|dict|None): Cookie configuration
cors_allowed_origins (str|list|callable): CORS allowed origins
cors_credentials (bool): Allow credentials in CORS, default True
logger (bool|Logger): Logging configuration, default False
json (module): Alternative JSON module
async_handlers (bool): Run handlers asynchronously, default True
monitor_clients (bool): Monitor client connections, default True
transports (list): Allowed transports, default ['polling', 'websocket']
"""Send messages and packets to connected clients using async/await syntax.
async def send(self, sid, data):
"""
Send a message to a client (coroutine).
Args:
sid (str): Session ID of the client
data (any): Message data to send
Raises:
SocketIsClosedError: If client socket is closed
"""
async def send_packet(self, sid, pkt):
"""
Send a raw packet to a client (coroutine).
Args:
sid (str): Session ID of the client
pkt (Packet): Raw packet object to send
Raises:
SocketIsClosedError: If client socket is closed
"""Manage client sessions asynchronously with coroutine-based methods.
async def get_session(self, sid):
"""
Return the user session for a client (coroutine).
Args:
sid (str): Session ID of the client
Returns:
dict: User session data
Raises:
KeyError: If session does not exist
"""
async def save_session(self, sid, session):
"""
Store the user session for a client (coroutine).
Args:
sid (str): Session ID of the client
session (dict): Session data to store
"""
def session(self, sid):
"""
Return user session with async context manager syntax.
Args:
sid (str): Session ID of the client
Returns:
AsyncContextManager: Async session context manager for automatic save
"""Usage example:
# Direct async session management
session_data = await eio.get_session(sid)
session_data['user_id'] = 123
await eio.save_session(sid, session_data)
# Async context manager approach
async with eio.session(sid) as session:
session['user_id'] = 123
session['preferences'] = {'theme': 'dark'}Handle client connections and disconnections asynchronously.
async def disconnect(self, sid=None):
"""
Disconnect a client or all clients (coroutine).
Args:
sid (str, optional): Session ID to disconnect. If None, disconnects all clients
"""Register event handlers that can be either synchronous or asynchronous functions.
def on(self, event, handler=None):
"""
Register an event handler (synchronous method).
Args:
event (str): Event name ('connect', 'message', 'disconnect')
handler (callable, optional): Event handler function (sync or async)
Returns:
callable: Decorator function if handler not provided
"""Usage examples:
# Async event handlers
@eio.on('connect')
async def on_connect(sid, environ):
print(f'Client {sid} connected')
@eio.on('message')
async def on_message(sid, data):
print(f'Received: {data}')
await eio.send(sid, f'Echo: {data}')
@eio.on('disconnect')
async def on_disconnect(sid):
print(f'Client {sid} disconnected')
# Synchronous handlers also work
@eio.on('connect')
def on_connect_sync(sid, environ):
print(f'Client {sid} connected (sync)')Handle HTTP requests asynchronously for integration with ASGI applications.
async def handle_request(self, *args, **kwargs):
"""
Handle an HTTP request (coroutine).
Args:
*args: Request arguments (varies by framework)
**kwargs: Request keyword arguments (varies by framework)
Returns:
Response object appropriate for the framework
"""Attach the server to various async web frameworks.
def attach(self, app, engineio_path='engine.io'):
"""
Attach the server to an application.
Args:
app: Application object (aiohttp, Sanic, etc.)
engineio_path (str): Engine.IO endpoint path, default 'engine.io'
"""Manage background tasks using asyncio.
def start_background_task(self, target, *args, **kwargs):
"""
Start a background task using asyncio.
Args:
target (callable): Async task function to run
*args: Arguments for the task function
**kwargs: Keyword arguments for the task function
Returns:
asyncio.Task: Asyncio task object
"""
async def sleep(self, seconds=0):
"""
Sleep using asyncio (coroutine).
Args:
seconds (float): Sleep duration in seconds
"""
async def shutdown(self):
"""
Stop all background tasks and clean up resources (coroutine).
"""Inherited synchronous methods from the base server class.
def transport(self, sid):
"""
Return the transport name for a client.
Args:
sid (str): Session ID of the client
Returns:
str: Transport name ('polling' or 'websocket')
"""
def create_queue(self, *args, **kwargs):
"""
Create an asyncio queue object.
Returns:
asyncio.Queue: Asyncio queue object
"""
def get_queue_empty_exception(self):
"""
Return the asyncio queue empty exception.
Returns:
asyncio.QueueEmpty: Asyncio queue empty exception
"""
def create_event(self, *args, **kwargs):
"""
Create an asyncio event object.
Returns:
asyncio.Event: Asyncio event object
"""
def generate_id(self):
"""
Generate a unique session ID.
Returns:
str: Unique session identifier
"""Methods specific to the async server implementation.
def is_asyncio_based(self):
"""
Return True to identify as asyncio-based server.
Returns:
bool: Always True for AsyncServer
"""
def async_modes(self):
"""
Return supported async modes for asyncio.
Returns:
list: ['aiohttp', 'sanic', 'tornado', 'asgi']
"""The async server supports the same events as the synchronous server, but handlers can be async:
connect: Fired when a client establishes a connection
async (sid: str, environ: dict) -> None or (sid: str, environ: dict) -> Nonemessage: Fired when a message is received from a client
async (sid: str, data: any) -> None or (sid: str, data: any) -> Nonedisconnect: Fired when a client disconnects
async (sid: str) -> None or (sid: str) -> Noneimport engineio
from fastapi import FastAPI
# Create async server
eio = engineio.AsyncServer(async_mode='asgi')
@eio.on('connect')
async def connect(sid, environ):
print(f'Client {sid} connected')
# Create ASGI app
app = engineio.ASGIApp(eio)
# Wrap with FastAPI
fastapi_app = FastAPI()
fastapi_app.mount("/", app)import engineio
from aiohttp import web
# Create async server
eio = engineio.AsyncServer(async_mode='aiohttp')
@eio.on('connect')
async def connect(sid, environ):
print(f'Client {sid} connected')
# Create aiohttp app
app = web.Application()
eio.attach(app)The async server may raise the same exceptions as the synchronous server:
EngineIOError: Base exception for all Engine.IO errorsSocketIsClosedError: When attempting to send to a closed socketContentTooLongError: When message size exceeds buffer limitsConnectionError: When connection fails or is lost unexpectedlyInstall with Tessl CLI
npx tessl i tessl/pypi-python-engineio