Engine.IO server and client for Python providing real-time bidirectional communication
—
ASGI application middleware for integrating Engine.IO servers with FastAPI, Starlette, and other ASGI-compatible web frameworks. Provides seamless integration with modern async web applications while handling Engine.IO traffic.
Create ASGI middleware that wraps an Engine.IO AsyncServer with optional static file serving, fallback application support, and lifecycle management.
class ASGIApp:
def __init__(
self,
engineio_server,
other_asgi_app=None,
static_files=None,
engineio_path='engine.io',
on_startup=None,
on_shutdown=None
):
"""
Initialize ASGI middleware for Engine.IO.
Args:
engineio_server (AsyncServer): The Engine.IO async server instance
other_asgi_app (callable, optional): ASGI app for non-Engine.IO traffic
static_files (dict, optional): Static file mapping rules
engineio_path (str): Engine.IO endpoint path, default 'engine.io'
on_startup (callable, optional): Startup callback function
on_shutdown (callable, optional): Shutdown callback function
"""Standard ASGI application callable that routes requests between Engine.IO and fallback applications.
async def __call__(self, scope, receive, send):
"""
ASGI application callable.
Args:
scope (dict): ASGI scope dictionary
receive (callable): ASGI receive callable
send (callable): ASGI send callable
"""Serve static files asynchronously with proper content type detection and error handling.
async def serve_static_file(self, static_file, receive, send):
"""
Serve a static file.
Args:
static_file (str): Path to the static file
receive (callable): ASGI receive callable
send (callable): ASGI send callable
"""Handle ASGI lifespan events for application startup and shutdown.
async def lifespan(self, scope, receive, send):
"""
Handle ASGI lifespan events.
Args:
scope (dict): ASGI lifespan scope
receive (callable): ASGI receive callable
send (callable): ASGI send callable
"""Built-in error handling for requests that don't match Engine.IO patterns.
async def not_found(self, receive, send):
"""
Return a 404 Not Found response.
Args:
receive (callable): ASGI receive callable
send (callable): ASGI send callable
"""import engineio
from fastapi import FastAPI
# Create FastAPI app
fastapi_app = FastAPI()
# Create Engine.IO async server
eio = engineio.AsyncServer(async_mode='asgi')
@eio.on('connect')
async def on_connect(sid, environ):
print(f'Client {sid} connected')
@eio.on('message')
async def on_message(sid, data):
print(f'Message from {sid}: {data}')
await eio.send(sid, f'Echo: {data}')
@eio.on('disconnect')
async def on_disconnect(sid):
print(f'Client {sid} disconnected')
# FastAPI routes
@fastapi_app.get('/')
async def root():
return {'message': 'Hello World'}
@fastapi_app.get('/api/data')
async def get_data():
return {'message': 'Hello from FastAPI!'}
# Wrap FastAPI app with Engine.IO middleware
app = engineio.ASGIApp(eio, fastapi_app)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8000)import engineio
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
# Create Engine.IO async server
eio = engineio.AsyncServer(async_mode='asgi')
@eio.on('connect')
async def on_connect(sid, environ):
print(f'Client {sid} connected')
@eio.on('message')
async def on_message(sid, data):
await eio.send(sid, f'Starlette says: {data}')
# Starlette routes
async def homepage(request):
return JSONResponse({'message': 'Hello Starlette'})
async def api_endpoint(request):
return JSONResponse({'data': 'API response'})
starlette_app = Starlette(routes=[
Route('/', homepage),
Route('/api', api_endpoint),
])
# Wrap with Engine.IO middleware
app = engineio.ASGIApp(eio, starlette_app)import engineio
# Create Engine.IO server only (no fallback app)
eio = engineio.AsyncServer(async_mode='asgi')
@eio.on('connect')
async def on_connect(sid, environ):
print(f'Client {sid} connected')
@eio.on('message')
async def on_message(sid, data):
await eio.send(sid, data.upper())
# Create standalone ASGI app
app = engineio.ASGIApp(eio)
# Deploy with any ASGI server
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='localhost', port=8000)import engineio
eio = engineio.AsyncServer(async_mode='asgi')
# Define static file mappings
static_files = {
'/': 'index.html',
'/static/(.*)': r'static/\1',
'/assets/(.*)': r'public/assets/\1'
}
# Create ASGI app with static file serving
app = engineio.ASGIApp(eio, static_files=static_files)Static file features:
import engineio
import asyncio
eio = engineio.AsyncServer(async_mode='asgi')
# Background task for periodic cleanup
cleanup_task = None
async def startup():
"""Application startup tasks"""
global cleanup_task
print('Starting up...')
async def periodic_cleanup():
while True:
await asyncio.sleep(300) # 5 minutes
print('Running periodic cleanup...')
cleanup_task = asyncio.create_task(periodic_cleanup())
async def shutdown():
"""Application shutdown tasks"""
global cleanup_task
print('Shutting down...')
if cleanup_task:
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
# Create ASGI app with lifespan management
app = engineio.ASGIApp(
eio,
on_startup=startup,
on_shutdown=shutdown
)import engineio
eio = engineio.AsyncServer(async_mode='asgi')
# Use custom Engine.IO endpoint
app = engineio.ASGIApp(eio, engineio_path='realtime/socket')
# Clients should connect to: ws://server/realtime/socket/import engineio
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
# Create multiple servers
chat_server = engineio.AsyncServer(async_mode='asgi')
notifications_server = engineio.AsyncServer(async_mode='asgi')
@chat_server.on('message')
async def on_chat_message(sid, data):
await chat_server.send(sid, f'Chat: {data}')
@notifications_server.on('message')
async def on_notification(sid, data):
await notifications_server.send(sid, f'Notification: {data}')
# Create separate ASGI apps
chat_app = engineio.ASGIApp(chat_server, engineio_path='chat')
notifications_app = engineio.ASGIApp(notifications_server, engineio_path='notifications')
# Custom routing middleware
class MultiEngineIOMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
if request.url.path.startswith('/chat/'):
# Route to chat server
return await chat_app(request.scope, request.receive, request._send)
elif request.url.path.startswith('/notifications/'):
# Route to notifications server
return await notifications_app(request.scope, request.receive, request._send)
else:
return await call_next(request)
main_app = Starlette(middleware=[
Middleware(MultiEngineIOMiddleware)
])import engineio
from fastapi import FastAPI, Depends
from typing import Optional
# Create FastAPI app
fastapi_app = FastAPI()
eio = engineio.AsyncServer(async_mode='asgi')
# Dependency for accessing Engine.IO server
def get_engineio_server():
return eio
@fastapi_app.post('/broadcast')
async def broadcast_message(
message: str,
eio_server: engineio.AsyncServer = Depends(get_engineio_server)
):
"""Broadcast message to all connected clients"""
# Get all connected clients
# Note: This is a simplified example - actual implementation
# would need to track connected clients
for sid in eio_server.manager.get_participants('/'):
await eio_server.send(sid, message)
return {'status': 'broadcasted', 'message': message}
@eio.on('connect')
async def on_connect(sid, environ):
print(f'Client {sid} connected')
# Wrap with Engine.IO
app = engineio.ASGIApp(eio, fastapi_app)import engineio
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
fastapi_app = FastAPI()
eio = engineio.AsyncServer(async_mode='asgi')
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token"""
token = credentials.credentials
# Add your token verification logic here
if token != 'valid-token':
raise HTTPException(status_code=401, detail='Invalid token')
return token
@eio.on('connect')
async def on_connect(sid, environ):
# Access authentication info from ASGI scope
headers = dict(environ.get('asgi', {}).get('scope', {}).get('headers', []))
auth_header = headers.get(b'authorization', b'').decode()
if not auth_header.startswith('Bearer '):
await eio.disconnect(sid)
return
# Store authenticated session
token = auth_header[7:] # Remove 'Bearer ' prefix
await eio.save_session(sid, {'token': token, 'authenticated': True})
print(f'Authenticated client {sid} connected')
@fastapi_app.get('/protected')
async def protected_route(token: str = Depends(verify_token)):
return {'message': 'This is a protected route', 'token': token}
app = engineio.ASGIApp(eio, fastapi_app)import engineio
from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
fastapi_app = FastAPI()
eio = engineio.AsyncServer(async_mode='asgi')
@eio.on('connect')
async def on_engineio_connect(sid, environ):
print(f'Engine.IO client {sid} connected')
@fastapi_app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
"""Native FastAPI WebSocket endpoint"""
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f'FastAPI WebSocket echo: {data}')
except WebSocketDisconnect:
print('FastAPI WebSocket client disconnected')
# Both Engine.IO and native WebSockets work together
app = engineio.ASGIApp(eio, fastapi_app)Engine.IO event handlers can access the ASGI scope for request information:
@eio.on('connect')
async def on_connect(sid, environ):
# Access ASGI scope
scope = environ.get('asgi', {}).get('scope', {})
# Get request details
client = scope.get('client', ['unknown', 0])
headers = dict(scope.get('headers', []))
query_string = scope.get('query_string', b'').decode()
print(f'Client {sid} connected from {client[0]}:{client[1]}')
print(f'User-Agent: {headers.get(b"user-agent", b"unknown").decode()}')
# Parse query parameters
from urllib.parse import parse_qs
params = parse_qs(query_string)
# Store client info in session
await eio.save_session(sid, {
'client_ip': client[0],
'client_port': client[1],
'headers': headers,
'params': params
})# For production with Uvicorn
import engineio
from fastapi import FastAPI
app = FastAPI()
eio = engineio.AsyncServer(async_mode='asgi')
# Configure server and handlers...
app = engineio.ASGIApp(eio, app)
# Run with:
# uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1Engine.IO requires sticky sessions when using multiple workers. Consider using:
# Example with Redis session storage
import engineio
eio = engineio.AsyncServer(
async_mode='asgi',
client_manager=engineio.AsyncRedisManager('redis://localhost:6379')
)The ASGI middleware handles various error conditions:
Custom error handling can be implemented in the fallback ASGI application or using FastAPI exception handlers:
from fastapi import FastAPI, HTTPException
from fastapi.exception_handlers import http_exception_handler
fastapi_app = FastAPI()
@fastapi_app.exception_handler(HTTPException)
async def custom_http_exception_handler(request, exc):
"""Custom HTTP exception handler"""
print(f'HTTP exception: {exc.detail}')
return await http_exception_handler(request, exc)
app = engineio.ASGIApp(eio, fastapi_app)Install with Tessl CLI
npx tessl i tessl/pypi-python-engineio