Engine.IO server and client for Python providing real-time bidirectional communication
—
Asyncio-based Engine.IO client for connecting to servers from async Python applications. Provides the same functionality as the synchronous client but with full async/await support for modern async applications.
Create and configure an asynchronous Engine.IO client with the same configuration options as the synchronous client.
class AsyncClient:
def __init__(
self,
logger=False,
json=None,
request_timeout=5,
http_session=None,
ssl_verify=True,
handle_sigint=True,
websocket_extra_options=None,
timestamp_requests=True
):
"""
Initialize asynchronous Engine.IO client.
Args:
logger (bool|Logger): Logging configuration, default False
json (module): Alternative JSON module
request_timeout (int): Request timeout in seconds, default 5
http_session (aiohttp.ClientSession): HTTP session object for requests
ssl_verify (bool): Verify SSL certificates, default True
handle_sigint (bool): Handle SIGINT automatically, default True
websocket_extra_options (dict): Extra WebSocket client options
timestamp_requests (bool): Add timestamps to requests, default True
"""Establish connections to Engine.IO servers asynchronously with coroutine-based methods.
async def connect(self, url, headers=None, transports=None, engineio_path='engine.io'):
"""
Connect to an Engine.IO server (coroutine).
Args:
url (str): Server URL (e.g., 'http://localhost:5000')
headers (dict, optional): Additional HTTP headers for connection
transports (list, optional): Allowed transports ['polling', 'websocket']
engineio_path (str): Engine.IO endpoint path, default 'engine.io'
Raises:
ConnectionError: If connection fails
ValueError: If URL is invalid or transports unsupported
"""
async def wait(self):
"""
Wait until the connection ends (coroutine).
This method suspends the current coroutine until the client disconnects
or the connection is lost.
"""
async def disconnect(self, abort=False, reason=None):
"""
Disconnect from the server (coroutine).
Args:
abort (bool): Abort connection immediately without graceful shutdown
reason (str, optional): Reason for disconnection
"""Send messages to the server asynchronously and receive responses through the event system.
async def send(self, data):
"""
Send a message to the server (coroutine).
Args:
data (any): Message data to send (will be JSON-serialized)
Raises:
SocketIsClosedError: If not connected to server
"""Register event handlers that can be either synchronous or asynchronous functions.
def on(self, event, handler=None):
"""
Register an event handler (synchronous method).
Args:
event (str): Event name ('connect', 'message', 'disconnect')
handler (callable, optional): Event handler function (sync or async)
Returns:
callable: Decorator function if handler not provided
"""Usage examples:
# Async event handlers
@client.on('connect')
async def on_connect():
print('Connected to server')
await client.send('Hello Server!')
@client.on('message')
async def on_message(data):
print(f'Received from server: {data}')
# Can perform async operations in handlers
await asyncio.sleep(0.1)
await client.send(f'Echo: {data}')
@client.on('disconnect')
async def on_disconnect():
print('Disconnected from server')
# Synchronous handlers also work
@client.on('connect')
def on_connect_sync():
print('Connected (sync handler)')Manage background tasks using asyncio.
def start_background_task(self, target, *args, **kwargs):
"""
Start a background task using asyncio.
Args:
target (callable): Async task function to run
*args: Arguments for the task function
**kwargs: Keyword arguments for the task function
Returns:
asyncio.Task: Asyncio task object
"""
async def sleep(self, seconds=0):
"""
Sleep using asyncio (coroutine).
Args:
seconds (float): Sleep duration in seconds
"""Inherited synchronous methods from the base client class.
def transport(self):
"""
Return the current transport name.
Returns:
str: Transport name ('polling' or 'websocket')
Raises:
ValueError: If not connected
"""
def create_queue(self, *args, **kwargs):
"""
Create an asyncio queue object.
Returns:
asyncio.Queue: Asyncio queue object
"""
def get_queue_empty_exception(self):
"""
Return the asyncio queue empty exception.
Returns:
asyncio.QueueEmpty: Asyncio queue empty exception
"""
def create_event(self, *args, **kwargs):
"""
Create an asyncio event object.
Returns:
asyncio.Event: Asyncio event object
"""Methods specific to the async client implementation.
def is_asyncio_based(self):
"""
Return True to identify as asyncio-based client.
Returns:
bool: Always True for AsyncClient
"""The async client supports the same events as the synchronous client, but handlers can be async:
connect: Fired when connection to server is established
async () -> None or () -> Nonemessage: Fired when a message is received from the server
async (data: any) -> None or (data: any) -> Nonedisconnect: Fired when disconnected from the server
async () -> None or () -> Noneimport asyncio
import engineio
async def main():
client = engineio.AsyncClient()
@client.on('connect')
async def on_connect():
print('Connected!')
await client.send('Hello from async client')
@client.on('message')
async def on_message(data):
print(f'Server says: {data}')
@client.on('disconnect')
async def on_disconnect():
print('Disconnected')
await client.connect('http://localhost:5000')
await client.wait()
# Run the async main function
asyncio.run(main())import asyncio
import aiohttp
import engineio
async def main():
# Create custom aiohttp session
async with aiohttp.ClientSession() as session:
client = engineio.AsyncClient(
http_session=session,
request_timeout=10
)
await client.connect('http://localhost:5000')
await client.wait()
asyncio.run(main())import asyncio
import engineio
async def heartbeat_task(client):
"""Send periodic heartbeat messages"""
while True:
try:
await client.send({'type': 'heartbeat', 'timestamp': time.time()})
await asyncio.sleep(30)
except Exception as e:
print(f'Heartbeat failed: {e}')
break
async def main():
client = engineio.AsyncClient()
@client.on('connect')
async def on_connect():
print('Connected, starting heartbeat')
# Start background heartbeat task
client.start_background_task(heartbeat_task, client)
await client.connect('http://localhost:5000')
await client.wait()
asyncio.run(main())import asyncio
import engineio
async def connect_with_retry(client, url, max_retries=5):
"""Connect with automatic retry logic"""
for attempt in range(max_retries):
try:
await client.connect(url)
print('Connected successfully')
return
except Exception as e:
print(f'Connection attempt {attempt + 1} failed: {e}')
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
raise ConnectionError('Max connection attempts exceeded')
async def main():
client = engineio.AsyncClient()
@client.on('connect')
async def on_connect():
print('Connected to server')
@client.on('disconnect')
async def on_disconnect():
print('Disconnected, attempting to reconnect...')
await asyncio.sleep(5)
try:
await connect_with_retry(client, 'http://localhost:5000')
except Exception as e:
print(f'Reconnection failed permanently: {e}')
await connect_with_retry(client, 'http://localhost:5000')
await client.wait()
asyncio.run(main())# FastAPI integration example
from fastapi import FastAPI
import engineio
app = FastAPI()
client = engineio.AsyncClient()
@app.on_event("startup")
async def startup():
await client.connect('http://external-service:5000')
@app.on_event("shutdown")
async def shutdown():
await client.disconnect()
@app.post("/send-message")
async def send_message(message: str):
await client.send(message)
return {"status": "sent"}import asyncio
import engineio
async def handle_client(client_id, url):
"""Handle a single client connection"""
client = engineio.AsyncClient()
@client.on('connect')
async def on_connect():
print(f'Client {client_id} connected')
@client.on('message')
async def on_message(data):
print(f'Client {client_id} received: {data}')
await client.connect(url)
await client.wait()
async def main():
# Connect multiple clients concurrently
tasks = [
handle_client(i, 'http://localhost:5000')
for i in range(5)
]
await asyncio.gather(*tasks)
asyncio.run(main())The async client may raise the same exceptions as the synchronous client:
ConnectionError: When connection to server fails or is lostSocketIsClosedError: When attempting to send while not connectedValueError: When invalid parameters are providedasyncio.TimeoutError: When requests exceed the configured timeoutimport asyncio
import engineio
class AsyncEngineIOClient:
def __init__(self, url, **kwargs):
self.url = url
self.client = engineio.AsyncClient(**kwargs)
async def __aenter__(self):
await self.client.connect(self.url)
return self.client
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.disconnect()
# Usage
async def main():
async with AsyncEngineIOClient('http://localhost:5000') as client:
@client.on('message')
async def on_message(data):
print(f'Received: {data}')
await client.send('Hello')
await asyncio.sleep(10)
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-python-engineio