CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-engineio

Engine.IO server and client for Python providing real-time bidirectional communication

Pending
Overview
Eval results
Files

asgi-middleware.mddocs/

ASGI Middleware

ASGI application middleware for integrating Engine.IO servers with FastAPI, Starlette, and other ASGI-compatible web frameworks. Provides seamless integration with modern async web applications while handling Engine.IO traffic.

Capabilities

Middleware Initialization

Create ASGI middleware that wraps an Engine.IO AsyncServer with optional static file serving, fallback application support, and lifecycle management.

class ASGIApp:
    def __init__(
        self,
        engineio_server,
        other_asgi_app=None,
        static_files=None,
        engineio_path='engine.io',
        on_startup=None,
        on_shutdown=None
    ):
        """
        Initialize ASGI middleware for Engine.IO.

        Args:
            engineio_server (AsyncServer): The Engine.IO async server instance
            other_asgi_app (callable, optional): ASGI app for non-Engine.IO traffic
            static_files (dict, optional): Static file mapping rules
            engineio_path (str): Engine.IO endpoint path, default 'engine.io'
            on_startup (callable, optional): Startup callback function
            on_shutdown (callable, optional): Shutdown callback function
        """

ASGI Application Interface

Standard ASGI application callable that routes requests between Engine.IO and fallback applications.

async def __call__(self, scope, receive, send):
    """
    ASGI application callable.

    Args:
        scope (dict): ASGI scope dictionary
        receive (callable): ASGI receive callable
        send (callable): ASGI send callable
    """

Static File Serving

Serve static files asynchronously with proper content type detection and error handling.

async def serve_static_file(self, static_file, receive, send):
    """
    Serve a static file.

    Args:
        static_file (str): Path to the static file
        receive (callable): ASGI receive callable
        send (callable): ASGI send callable
    """

Lifespan Management

Handle ASGI lifespan events for application startup and shutdown.

async def lifespan(self, scope, receive, send):
    """
    Handle ASGI lifespan events.

    Args:
        scope (dict): ASGI lifespan scope
        receive (callable): ASGI receive callable
        send (callable): ASGI send callable
    """

Error Handling

Built-in error handling for requests that don't match Engine.IO patterns.

async def not_found(self, receive, send):
    """
    Return a 404 Not Found response.

    Args:
        receive (callable): ASGI receive callable
        send (callable): ASGI send callable
    """

Integration Examples

FastAPI Integration

import engineio
from fastapi import FastAPI

# Create FastAPI app
fastapi_app = FastAPI()

# Create Engine.IO async server
eio = engineio.AsyncServer(async_mode='asgi')

@eio.on('connect')
async def on_connect(sid, environ):
    print(f'Client {sid} connected')

@eio.on('message')
async def on_message(sid, data):
    print(f'Message from {sid}: {data}')
    await eio.send(sid, f'Echo: {data}')

@eio.on('disconnect')
async def on_disconnect(sid):
    print(f'Client {sid} disconnected')

# FastAPI routes
@fastapi_app.get('/')
async def root():
    return {'message': 'Hello World'}

@fastapi_app.get('/api/data')
async def get_data():
    return {'message': 'Hello from FastAPI!'}

# Wrap FastAPI app with Engine.IO middleware
app = engineio.ASGIApp(eio, fastapi_app)

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='0.0.0.0', port=8000)

Starlette Integration

import engineio
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

# Create Engine.IO async server
eio = engineio.AsyncServer(async_mode='asgi')

@eio.on('connect')
async def on_connect(sid, environ):
    print(f'Client {sid} connected')

@eio.on('message')
async def on_message(sid, data):
    await eio.send(sid, f'Starlette says: {data}')

# Starlette routes
async def homepage(request):
    return JSONResponse({'message': 'Hello Starlette'})

async def api_endpoint(request):
    return JSONResponse({'data': 'API response'})

starlette_app = Starlette(routes=[
    Route('/', homepage),
    Route('/api', api_endpoint),
])

# Wrap with Engine.IO middleware
app = engineio.ASGIApp(eio, starlette_app)

Standalone ASGI Application

import engineio

# Create Engine.IO server only (no fallback app)
eio = engineio.AsyncServer(async_mode='asgi')

@eio.on('connect')
async def on_connect(sid, environ):
    print(f'Client {sid} connected')

@eio.on('message')
async def on_message(sid, data):
    await eio.send(sid, data.upper())

# Create standalone ASGI app
app = engineio.ASGIApp(eio)

# Deploy with any ASGI server
if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='localhost', port=8000)

Static File Serving

import engineio

eio = engineio.AsyncServer(async_mode='asgi')

# Define static file mappings
static_files = {
    '/': 'index.html',
    '/static/(.*)': r'static/\1',
    '/assets/(.*)': r'public/assets/\1'
}

# Create ASGI app with static file serving
app = engineio.ASGIApp(eio, static_files=static_files)

Static file features:

  • Async file serving with proper content types
  • Regex pattern matching for flexible URL routing
  • Automatic MIME type detection
  • Efficient streaming for large files

Lifespan Management

import engineio
import asyncio

eio = engineio.AsyncServer(async_mode='asgi')

# Background task for periodic cleanup
cleanup_task = None

async def startup():
    """Application startup tasks"""
    global cleanup_task
    print('Starting up...')
    
    async def periodic_cleanup():
        while True:
            await asyncio.sleep(300)  # 5 minutes
            print('Running periodic cleanup...')
    
    cleanup_task = asyncio.create_task(periodic_cleanup())

async def shutdown():
    """Application shutdown tasks"""
    global cleanup_task
    print('Shutting down...')
    
    if cleanup_task:
        cleanup_task.cancel()
        try:
            await cleanup_task
        except asyncio.CancelledError:
            pass

# Create ASGI app with lifespan management
app = engineio.ASGIApp(
    eio,
    on_startup=startup,
    on_shutdown=shutdown
)

Custom Endpoint Path

import engineio

eio = engineio.AsyncServer(async_mode='asgi')

# Use custom Engine.IO endpoint
app = engineio.ASGIApp(eio, engineio_path='realtime/socket')

# Clients should connect to: ws://server/realtime/socket/

Multiple Engine.IO Servers

import engineio
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware

# Create multiple servers
chat_server = engineio.AsyncServer(async_mode='asgi')
notifications_server = engineio.AsyncServer(async_mode='asgi')

@chat_server.on('message')
async def on_chat_message(sid, data):
    await chat_server.send(sid, f'Chat: {data}')

@notifications_server.on('message')
async def on_notification(sid, data):
    await notifications_server.send(sid, f'Notification: {data}')

# Create separate ASGI apps
chat_app = engineio.ASGIApp(chat_server, engineio_path='chat')
notifications_app = engineio.ASGIApp(notifications_server, engineio_path='notifications')

# Custom routing middleware
class MultiEngineIOMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        if request.url.path.startswith('/chat/'):
            # Route to chat server
            return await chat_app(request.scope, request.receive, request._send)
        elif request.url.path.startswith('/notifications/'):
            # Route to notifications server
            return await notifications_app(request.scope, request.receive, request._send)
        else:
            return await call_next(request)

main_app = Starlette(middleware=[
    Middleware(MultiEngineIOMiddleware)
])

Advanced Integration Patterns

FastAPI with Dependency Injection

import engineio
from fastapi import FastAPI, Depends
from typing import Optional

# Create FastAPI app
fastapi_app = FastAPI()
eio = engineio.AsyncServer(async_mode='asgi')

# Dependency for accessing Engine.IO server
def get_engineio_server():
    return eio

@fastapi_app.post('/broadcast')
async def broadcast_message(
    message: str,
    eio_server: engineio.AsyncServer = Depends(get_engineio_server)
):
    """Broadcast message to all connected clients"""
    # Get all connected clients
    # Note: This is a simplified example - actual implementation
    # would need to track connected clients
    for sid in eio_server.manager.get_participants('/'):
        await eio_server.send(sid, message)
    
    return {'status': 'broadcasted', 'message': message}

@eio.on('connect')
async def on_connect(sid, environ):
    print(f'Client {sid} connected')

# Wrap with Engine.IO
app = engineio.ASGIApp(eio, fastapi_app)

Authentication Integration

import engineio
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

fastapi_app = FastAPI()
eio = engineio.AsyncServer(async_mode='asgi')
security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Verify JWT token"""
    token = credentials.credentials
    # Add your token verification logic here
    if token != 'valid-token':
        raise HTTPException(status_code=401, detail='Invalid token')
    return token

@eio.on('connect')
async def on_connect(sid, environ):
    # Access authentication info from ASGI scope
    headers = dict(environ.get('asgi', {}).get('scope', {}).get('headers', []))
    auth_header = headers.get(b'authorization', b'').decode()
    
    if not auth_header.startswith('Bearer '):
        await eio.disconnect(sid)
        return
    
    # Store authenticated session
    token = auth_header[7:]  # Remove 'Bearer ' prefix
    await eio.save_session(sid, {'token': token, 'authenticated': True})
    print(f'Authenticated client {sid} connected')

@fastapi_app.get('/protected')
async def protected_route(token: str = Depends(verify_token)):
    return {'message': 'This is a protected route', 'token': token}

app = engineio.ASGIApp(eio, fastapi_app)

WebSocket Coexistence

import engineio
from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect

fastapi_app = FastAPI()
eio = engineio.AsyncServer(async_mode='asgi')

@eio.on('connect')
async def on_engineio_connect(sid, environ):
    print(f'Engine.IO client {sid} connected')

@fastapi_app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
    """Native FastAPI WebSocket endpoint"""
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f'FastAPI WebSocket echo: {data}')
    except WebSocketDisconnect:
        print('FastAPI WebSocket client disconnected')

# Both Engine.IO and native WebSockets work together
app = engineio.ASGIApp(eio, fastapi_app)

ASGI Scope Access

Engine.IO event handlers can access the ASGI scope for request information:

@eio.on('connect')
async def on_connect(sid, environ):
    # Access ASGI scope
    scope = environ.get('asgi', {}).get('scope', {})
    
    # Get request details
    client = scope.get('client', ['unknown', 0])
    headers = dict(scope.get('headers', []))
    query_string = scope.get('query_string', b'').decode()
    
    print(f'Client {sid} connected from {client[0]}:{client[1]}')
    print(f'User-Agent: {headers.get(b"user-agent", b"unknown").decode()}')
    
    # Parse query parameters
    from urllib.parse import parse_qs
    params = parse_qs(query_string)
    
    # Store client info in session
    await eio.save_session(sid, {
        'client_ip': client[0],
        'client_port': client[1],
        'headers': headers,
        'params': params
    })

Deployment Considerations

Production Deployment

# For production with Uvicorn
import engineio
from fastapi import FastAPI

app = FastAPI()
eio = engineio.AsyncServer(async_mode='asgi')

# Configure server and handlers...

app = engineio.ASGIApp(eio, app)

# Run with:
# uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1

Multiple Workers Consideration

Engine.IO requires sticky sessions when using multiple workers. Consider using:

  • Single worker deployment
  • Load balancer with session affinity
  • Redis or other external session storage
# Example with Redis session storage
import engineio

eio = engineio.AsyncServer(
    async_mode='asgi',
    client_manager=engineio.AsyncRedisManager('redis://localhost:6379')
)

Error Handling

The ASGI middleware handles various error conditions:

  • Invalid Engine.IO requests: Returns appropriate HTTP error responses
  • Non-Engine.IO requests: Routes to fallback ASGI app or returns 404
  • Static file errors: Returns 404 for missing files
  • Server errors: Logs exceptions and returns 500 responses

Custom error handling can be implemented in the fallback ASGI application or using FastAPI exception handlers:

from fastapi import FastAPI, HTTPException
from fastapi.exception_handlers import http_exception_handler

fastapi_app = FastAPI()

@fastapi_app.exception_handler(HTTPException)
async def custom_http_exception_handler(request, exc):
    """Custom HTTP exception handler"""
    print(f'HTTP exception: {exc.detail}')
    return await http_exception_handler(request, exc)

app = engineio.ASGIApp(eio, fastapi_app)

Install with Tessl CLI

npx tessl i tessl/pypi-python-engineio

docs

asgi-middleware.md

async-client.md

async-server.md

client.md

exceptions.md

index.md

server.md

wsgi-middleware.md

tile.json