0
# ASGI Middleware
1
2
ASGI application middleware for integrating Engine.IO servers with FastAPI, Starlette, and other ASGI-compatible web frameworks. Provides seamless integration with modern async web applications while handling Engine.IO traffic.
3
4
## Capabilities
5
6
### Middleware Initialization
7
8
Create ASGI middleware that wraps an Engine.IO AsyncServer with optional static file serving, fallback application support, and lifecycle management.
9
10
```python { .api }
11
class ASGIApp:
12
def __init__(
13
self,
14
engineio_server,
15
other_asgi_app=None,
16
static_files=None,
17
engineio_path='engine.io',
18
on_startup=None,
19
on_shutdown=None
20
):
21
"""
22
Initialize ASGI middleware for Engine.IO.
23
24
Args:
25
engineio_server (AsyncServer): The Engine.IO async server instance
26
other_asgi_app (callable, optional): ASGI app for non-Engine.IO traffic
27
static_files (dict, optional): Static file mapping rules
28
engineio_path (str): Engine.IO endpoint path, default 'engine.io'
29
on_startup (callable, optional): Startup callback function
30
on_shutdown (callable, optional): Shutdown callback function
31
"""
32
```
33
34
### ASGI Application Interface
35
36
Standard ASGI application callable that routes requests between Engine.IO and fallback applications.
37
38
```python { .api }
39
async def __call__(self, scope, receive, send):
40
"""
41
ASGI application callable.
42
43
Args:
44
scope (dict): ASGI scope dictionary
45
receive (callable): ASGI receive callable
46
send (callable): ASGI send callable
47
"""
48
```
49
50
### Static File Serving
51
52
Serve static files asynchronously with proper content type detection and error handling.
53
54
```python { .api }
55
async def serve_static_file(self, static_file, receive, send):
56
"""
57
Serve a static file.
58
59
Args:
60
static_file (str): Path to the static file
61
receive (callable): ASGI receive callable
62
send (callable): ASGI send callable
63
"""
64
```
65
66
### Lifespan Management
67
68
Handle ASGI lifespan events for application startup and shutdown.
69
70
```python { .api }
71
async def lifespan(self, scope, receive, send):
72
"""
73
Handle ASGI lifespan events.
74
75
Args:
76
scope (dict): ASGI lifespan scope
77
receive (callable): ASGI receive callable
78
send (callable): ASGI send callable
79
"""
80
```
81
82
### Error Handling
83
84
Built-in error handling for requests that don't match Engine.IO patterns.
85
86
```python { .api }
87
async def not_found(self, receive, send):
88
"""
89
Return a 404 Not Found response.
90
91
Args:
92
receive (callable): ASGI receive callable
93
send (callable): ASGI send callable
94
"""
95
```
96
97
## Integration Examples
98
99
### FastAPI Integration
100
101
```python
102
import engineio
103
from fastapi import FastAPI
104
105
# Create FastAPI app
106
fastapi_app = FastAPI()
107
108
# Create Engine.IO async server
109
eio = engineio.AsyncServer(async_mode='asgi')
110
111
@eio.on('connect')
112
async def on_connect(sid, environ):
113
print(f'Client {sid} connected')
114
115
@eio.on('message')
116
async def on_message(sid, data):
117
print(f'Message from {sid}: {data}')
118
await eio.send(sid, f'Echo: {data}')
119
120
@eio.on('disconnect')
121
async def on_disconnect(sid):
122
print(f'Client {sid} disconnected')
123
124
# FastAPI routes
125
@fastapi_app.get('/')
126
async def root():
127
return {'message': 'Hello World'}
128
129
@fastapi_app.get('/api/data')
130
async def get_data():
131
return {'message': 'Hello from FastAPI!'}
132
133
# Wrap FastAPI app with Engine.IO middleware
134
app = engineio.ASGIApp(eio, fastapi_app)
135
136
if __name__ == '__main__':
137
import uvicorn
138
uvicorn.run(app, host='0.0.0.0', port=8000)
139
```
140
141
### Starlette Integration
142
143
```python
144
import engineio
145
from starlette.applications import Starlette
146
from starlette.responses import JSONResponse
147
from starlette.routing import Route
148
149
# Create Engine.IO async server
150
eio = engineio.AsyncServer(async_mode='asgi')
151
152
@eio.on('connect')
153
async def on_connect(sid, environ):
154
print(f'Client {sid} connected')
155
156
@eio.on('message')
157
async def on_message(sid, data):
158
await eio.send(sid, f'Starlette says: {data}')
159
160
# Starlette routes
161
async def homepage(request):
162
return JSONResponse({'message': 'Hello Starlette'})
163
164
async def api_endpoint(request):
165
return JSONResponse({'data': 'API response'})
166
167
starlette_app = Starlette(routes=[
168
Route('/', homepage),
169
Route('/api', api_endpoint),
170
])
171
172
# Wrap with Engine.IO middleware
173
app = engineio.ASGIApp(eio, starlette_app)
174
```
175
176
### Standalone ASGI Application
177
178
```python
179
import engineio
180
181
# Create Engine.IO server only (no fallback app)
182
eio = engineio.AsyncServer(async_mode='asgi')
183
184
@eio.on('connect')
185
async def on_connect(sid, environ):
186
print(f'Client {sid} connected')
187
188
@eio.on('message')
189
async def on_message(sid, data):
190
await eio.send(sid, data.upper())
191
192
# Create standalone ASGI app
193
app = engineio.ASGIApp(eio)
194
195
# Deploy with any ASGI server
196
if __name__ == '__main__':
197
import uvicorn
198
uvicorn.run(app, host='localhost', port=8000)
199
```
200
201
### Static File Serving
202
203
```python
204
import engineio
205
206
eio = engineio.AsyncServer(async_mode='asgi')
207
208
# Define static file mappings
209
static_files = {
210
'/': 'index.html',
211
'/static/(.*)': r'static/\1',
212
'/assets/(.*)': r'public/assets/\1'
213
}
214
215
# Create ASGI app with static file serving
216
app = engineio.ASGIApp(eio, static_files=static_files)
217
```
218
219
Static file features:
220
- Async file serving with proper content types
221
- Regex pattern matching for flexible URL routing
222
- Automatic MIME type detection
223
- Efficient streaming for large files
224
225
### Lifespan Management
226
227
```python
228
import engineio
229
import asyncio
230
231
eio = engineio.AsyncServer(async_mode='asgi')
232
233
# Background task for periodic cleanup
234
cleanup_task = None
235
236
async def startup():
237
"""Application startup tasks"""
238
global cleanup_task
239
print('Starting up...')
240
241
async def periodic_cleanup():
242
while True:
243
await asyncio.sleep(300) # 5 minutes
244
print('Running periodic cleanup...')
245
246
cleanup_task = asyncio.create_task(periodic_cleanup())
247
248
async def shutdown():
249
"""Application shutdown tasks"""
250
global cleanup_task
251
print('Shutting down...')
252
253
if cleanup_task:
254
cleanup_task.cancel()
255
try:
256
await cleanup_task
257
except asyncio.CancelledError:
258
pass
259
260
# Create ASGI app with lifespan management
261
app = engineio.ASGIApp(
262
eio,
263
on_startup=startup,
264
on_shutdown=shutdown
265
)
266
```
267
268
### Custom Endpoint Path
269
270
```python
271
import engineio
272
273
eio = engineio.AsyncServer(async_mode='asgi')
274
275
# Use custom Engine.IO endpoint
276
app = engineio.ASGIApp(eio, engineio_path='realtime/socket')
277
278
# Clients should connect to: ws://server/realtime/socket/
279
```
280
281
### Multiple Engine.IO Servers
282
283
```python
284
import engineio
285
from starlette.applications import Starlette
286
from starlette.middleware import Middleware
287
from starlette.middleware.base import BaseHTTPMiddleware
288
289
# Create multiple servers
290
chat_server = engineio.AsyncServer(async_mode='asgi')
291
notifications_server = engineio.AsyncServer(async_mode='asgi')
292
293
@chat_server.on('message')
294
async def on_chat_message(sid, data):
295
await chat_server.send(sid, f'Chat: {data}')
296
297
@notifications_server.on('message')
298
async def on_notification(sid, data):
299
await notifications_server.send(sid, f'Notification: {data}')
300
301
# Create separate ASGI apps
302
chat_app = engineio.ASGIApp(chat_server, engineio_path='chat')
303
notifications_app = engineio.ASGIApp(notifications_server, engineio_path='notifications')
304
305
# Custom routing middleware
306
class MultiEngineIOMiddleware(BaseHTTPMiddleware):
307
async def dispatch(self, request, call_next):
308
if request.url.path.startswith('/chat/'):
309
# Route to chat server
310
return await chat_app(request.scope, request.receive, request._send)
311
elif request.url.path.startswith('/notifications/'):
312
# Route to notifications server
313
return await notifications_app(request.scope, request.receive, request._send)
314
else:
315
return await call_next(request)
316
317
main_app = Starlette(middleware=[
318
Middleware(MultiEngineIOMiddleware)
319
])
320
```
321
322
## Advanced Integration Patterns
323
324
### FastAPI with Dependency Injection
325
326
```python
327
import engineio
328
from fastapi import FastAPI, Depends
329
from typing import Optional
330
331
# Create FastAPI app
332
fastapi_app = FastAPI()
333
eio = engineio.AsyncServer(async_mode='asgi')
334
335
# Dependency for accessing Engine.IO server
336
def get_engineio_server():
337
return eio
338
339
@fastapi_app.post('/broadcast')
340
async def broadcast_message(
341
message: str,
342
eio_server: engineio.AsyncServer = Depends(get_engineio_server)
343
):
344
"""Broadcast message to all connected clients"""
345
# Get all connected clients
346
# Note: This is a simplified example - actual implementation
347
# would need to track connected clients
348
for sid in eio_server.manager.get_participants('/'):
349
await eio_server.send(sid, message)
350
351
return {'status': 'broadcasted', 'message': message}
352
353
@eio.on('connect')
354
async def on_connect(sid, environ):
355
print(f'Client {sid} connected')
356
357
# Wrap with Engine.IO
358
app = engineio.ASGIApp(eio, fastapi_app)
359
```
360
361
### Authentication Integration
362
363
```python
364
import engineio
365
from fastapi import FastAPI, HTTPException, Depends
366
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
367
368
fastapi_app = FastAPI()
369
eio = engineio.AsyncServer(async_mode='asgi')
370
security = HTTPBearer()
371
372
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
373
"""Verify JWT token"""
374
token = credentials.credentials
375
# Add your token verification logic here
376
if token != 'valid-token':
377
raise HTTPException(status_code=401, detail='Invalid token')
378
return token
379
380
@eio.on('connect')
381
async def on_connect(sid, environ):
382
# Access authentication info from ASGI scope
383
headers = dict(environ.get('asgi', {}).get('scope', {}).get('headers', []))
384
auth_header = headers.get(b'authorization', b'').decode()
385
386
if not auth_header.startswith('Bearer '):
387
await eio.disconnect(sid)
388
return
389
390
# Store authenticated session
391
token = auth_header[7:] # Remove 'Bearer ' prefix
392
await eio.save_session(sid, {'token': token, 'authenticated': True})
393
print(f'Authenticated client {sid} connected')
394
395
@fastapi_app.get('/protected')
396
async def protected_route(token: str = Depends(verify_token)):
397
return {'message': 'This is a protected route', 'token': token}
398
399
app = engineio.ASGIApp(eio, fastapi_app)
400
```
401
402
### WebSocket Coexistence
403
404
```python
405
import engineio
406
from fastapi import FastAPI, WebSocket
407
from fastapi.websockets import WebSocketDisconnect
408
409
fastapi_app = FastAPI()
410
eio = engineio.AsyncServer(async_mode='asgi')
411
412
@eio.on('connect')
413
async def on_engineio_connect(sid, environ):
414
print(f'Engine.IO client {sid} connected')
415
416
@fastapi_app.websocket('/ws')
417
async def websocket_endpoint(websocket: WebSocket):
418
"""Native FastAPI WebSocket endpoint"""
419
await websocket.accept()
420
try:
421
while True:
422
data = await websocket.receive_text()
423
await websocket.send_text(f'FastAPI WebSocket echo: {data}')
424
except WebSocketDisconnect:
425
print('FastAPI WebSocket client disconnected')
426
427
# Both Engine.IO and native WebSockets work together
428
app = engineio.ASGIApp(eio, fastapi_app)
429
```
430
431
## ASGI Scope Access
432
433
Engine.IO event handlers can access the ASGI scope for request information:
434
435
```python
436
@eio.on('connect')
437
async def on_connect(sid, environ):
438
# Access ASGI scope
439
scope = environ.get('asgi', {}).get('scope', {})
440
441
# Get request details
442
client = scope.get('client', ['unknown', 0])
443
headers = dict(scope.get('headers', []))
444
query_string = scope.get('query_string', b'').decode()
445
446
print(f'Client {sid} connected from {client[0]}:{client[1]}')
447
print(f'User-Agent: {headers.get(b"user-agent", b"unknown").decode()}')
448
449
# Parse query parameters
450
from urllib.parse import parse_qs
451
params = parse_qs(query_string)
452
453
# Store client info in session
454
await eio.save_session(sid, {
455
'client_ip': client[0],
456
'client_port': client[1],
457
'headers': headers,
458
'params': params
459
})
460
```
461
462
## Deployment Considerations
463
464
### Production Deployment
465
466
```python
467
# For production with Uvicorn
468
import engineio
469
from fastapi import FastAPI
470
471
app = FastAPI()
472
eio = engineio.AsyncServer(async_mode='asgi')
473
474
# Configure server and handlers...
475
476
app = engineio.ASGIApp(eio, app)
477
478
# Run with:
479
# uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1
480
```
481
482
### Multiple Workers Consideration
483
484
Engine.IO requires sticky sessions when using multiple workers. Consider using:
485
486
- Single worker deployment
487
- Load balancer with session affinity
488
- Redis or other external session storage
489
490
```python
491
# Example with Redis session storage
492
import engineio
493
494
eio = engineio.AsyncServer(
495
async_mode='asgi',
496
client_manager=engineio.AsyncRedisManager('redis://localhost:6379')
497
)
498
```
499
500
## Error Handling
501
502
The ASGI middleware handles various error conditions:
503
504
- **Invalid Engine.IO requests**: Returns appropriate HTTP error responses
505
- **Non-Engine.IO requests**: Routes to fallback ASGI app or returns 404
506
- **Static file errors**: Returns 404 for missing files
507
- **Server errors**: Logs exceptions and returns 500 responses
508
509
Custom error handling can be implemented in the fallback ASGI application or using FastAPI exception handlers:
510
511
```python
512
from fastapi import FastAPI, HTTPException
513
from fastapi.exception_handlers import http_exception_handler
514
515
fastapi_app = FastAPI()
516
517
@fastapi_app.exception_handler(HTTPException)
518
async def custom_http_exception_handler(request, exc):
519
"""Custom HTTP exception handler"""
520
print(f'HTTP exception: {exc.detail}')
521
return await http_exception_handler(request, exc)
522
523
app = engineio.ASGIApp(eio, fastapi_app)
524
```