0
# ASGI and WebSocket Support
1
2
Modern asynchronous application support with WebSocket handling, Server-Sent Events, and full ASGI 3.0 compatibility for building real-time web applications.
3
4
## Capabilities
5
6
### ASGI Application
7
8
Asynchronous application support for modern web development with WebSocket capabilities.
9
10
```python { .api }
11
class falcon.asgi.App:
12
def __init__(
13
self,
14
media_type: str = 'application/json',
15
request_type: type = None,
16
response_type: type = None,
17
middleware: list = None,
18
router: object = None,
19
cors_enable: bool = False,
20
req_options: RequestOptions = None,
21
resp_options: ResponseOptions = None,
22
secure_cookies_by_default: bool = None
23
):
24
"""
25
Create an ASGI application.
26
27
Args: Same as WSGI App constructor
28
"""
29
30
async def __call__(self, scope: dict, receive: callable, send: callable):
31
"""
32
ASGI 3.0 callable interface.
33
34
Args:
35
scope: ASGI scope dictionary
36
receive: ASGI receive callable
37
send: ASGI send callable
38
"""
39
40
# Same methods as WSGI App:
41
# add_route, add_static_route, add_sink, add_middleware,
42
# add_error_handler, set_error_serializer
43
```
44
45
#### ASGI App Usage
46
47
```python
48
import falcon.asgi
49
import asyncio
50
51
class AsyncUserResource:
52
async def on_get(self, req, resp, user_id=None):
53
if user_id:
54
# Async database call
55
user = await fetch_user_async(user_id)
56
resp.media = user
57
else:
58
users = await fetch_all_users_async()
59
resp.media = {'users': users}
60
61
async def on_post(self, req, resp):
62
user_data = req.media
63
new_user = await create_user_async(user_data)
64
resp.status = falcon.HTTP_201
65
resp.media = new_user
66
67
# Create ASGI app
68
app = falcon.asgi.App()
69
app.add_route('/users', AsyncUserResource())
70
app.add_route('/users/{user_id}', AsyncUserResource())
71
72
# Run with ASGI server
73
if __name__ == '__main__':
74
import uvicorn
75
uvicorn.run(app, host='0.0.0.0', port=8000)
76
```
77
78
### WebSocket Support
79
80
Full-featured WebSocket handling for real-time communication.
81
82
```python { .api }
83
class WebSocket:
84
def __init__(self, scope: dict, receive: callable, send: callable):
85
"""
86
WebSocket connection handler.
87
88
Args:
89
scope: ASGI WebSocket scope
90
receive: ASGI receive callable
91
send: ASGI send callable
92
"""
93
94
# Connection state
95
ready_state: int # WebSocket ready state
96
client_address: tuple # Client IP and port
97
context: object # User data storage
98
path: str # WebSocket path
99
query_string: str # Query string
100
headers: dict # WebSocket headers
101
subprotocols: list # Supported subprotocols
102
103
# Connection management
104
async def accept(self, subprotocol: str = None, headers: dict = None):
105
"""
106
Accept WebSocket connection.
107
108
Args:
109
subprotocol: Selected subprotocol
110
headers: Additional response headers
111
"""
112
113
async def close(self, code: int = 1000, reason: str = None):
114
"""
115
Close WebSocket connection.
116
117
Args:
118
code: Close status code
119
reason: Close reason
120
"""
121
122
# Message receiving
123
async def receive_text(self) -> str:
124
"""
125
Receive text message.
126
127
Returns:
128
Text message content
129
130
Raises:
131
WebSocketDisconnected: If connection closed
132
"""
133
134
async def receive_data(self) -> bytes:
135
"""
136
Receive binary message.
137
138
Returns:
139
Binary message content
140
141
Raises:
142
WebSocketDisconnected: If connection closed
143
"""
144
145
async def receive_json(self) -> object:
146
"""
147
Receive JSON message.
148
149
Returns:
150
Parsed JSON data
151
152
Raises:
153
WebSocketDisconnected: If connection closed
154
ValueError: If JSON parsing fails
155
"""
156
157
async def receive_msgpack(self) -> object:
158
"""
159
Receive MessagePack message.
160
161
Returns:
162
Unpacked MessagePack data
163
164
Raises:
165
WebSocketDisconnected: If connection closed
166
"""
167
168
# Message sending
169
async def send_text(self, payload: str):
170
"""
171
Send text message.
172
173
Args:
174
payload: Text message to send
175
176
Raises:
177
WebSocketDisconnected: If connection closed
178
"""
179
180
async def send_data(self, payload: bytes):
181
"""
182
Send binary message.
183
184
Args:
185
payload: Binary data to send
186
187
Raises:
188
WebSocketDisconnected: If connection closed
189
"""
190
191
async def send_json(self, obj: object):
192
"""
193
Send JSON message.
194
195
Args:
196
obj: Object to serialize as JSON
197
198
Raises:
199
WebSocketDisconnected: If connection closed
200
"""
201
202
async def send_msgpack(self, obj: object):
203
"""
204
Send MessagePack message.
205
206
Args:
207
obj: Object to serialize as MessagePack
208
209
Raises:
210
WebSocketDisconnected: If connection closed
211
"""
212
213
class WebSocketOptions:
214
def __init__(
215
self,
216
media_handlers: object = None,
217
max_receive_queue: int = 32
218
):
219
"""
220
WebSocket configuration options.
221
222
Args:
223
media_handlers: Media handler registry
224
max_receive_queue: Maximum receive queue size
225
"""
226
```
227
228
#### WebSocket Usage Examples
229
230
```python
231
import falcon.asgi
232
import asyncio
233
import json
234
235
class ChatWebSocket:
236
def __init__(self):
237
self.connections = set()
238
239
async def on_websocket(self, req, ws):
240
await ws.accept()
241
self.connections.add(ws)
242
243
try:
244
while True:
245
message = await ws.receive_text()
246
data = json.loads(message)
247
248
# Broadcast to all connections
249
await self.broadcast({
250
'user': data.get('user', 'Anonymous'),
251
'message': data.get('message', ''),
252
'timestamp': time.time()
253
})
254
except falcon.WebSocketDisconnected:
255
pass
256
finally:
257
self.connections.discard(ws)
258
259
async def broadcast(self, data):
260
"""Broadcast message to all connected clients"""
261
if self.connections:
262
message = json.dumps(data)
263
await asyncio.gather(
264
*[ws.send_text(message) for ws in self.connections],
265
return_exceptions=True
266
)
267
268
class EchoWebSocket:
269
async def on_websocket(self, req, ws):
270
await ws.accept()
271
272
try:
273
while True:
274
# Handle different message types
275
try:
276
# Try to receive as JSON first
277
data = await ws.receive_json()
278
await ws.send_json({
279
'echo': data,
280
'type': 'json'
281
})
282
except ValueError:
283
# Fall back to text
284
message = await ws.receive_text()
285
await ws.send_text(f"Echo: {message}")
286
except falcon.WebSocketDisconnected:
287
print("Client disconnected")
288
289
# Register WebSocket routes
290
app = falcon.asgi.App()
291
app.add_route('/chat', ChatWebSocket())
292
app.add_route('/echo', EchoWebSocket())
293
```
294
295
### Server-Sent Events
296
297
Server-Sent Events support for real-time data streaming.
298
299
```python { .api }
300
class SSEvent:
301
def __init__(
302
self,
303
data: str,
304
event_type: str = None,
305
event_id: str = None,
306
retry: int = None
307
):
308
"""
309
Server-Sent Event.
310
311
Args:
312
data: Event data
313
event_type: Event type name
314
event_id: Event ID for client tracking
315
retry: Retry interval in milliseconds
316
"""
317
318
def serialize(self) -> str:
319
"""
320
Serialize event to SSE format.
321
322
Returns:
323
SSE-formatted string
324
"""
325
326
# Properties
327
data: str # Event data
328
event_type: str # Event type
329
event_id: str # Event ID
330
retry: int # Retry interval
331
```
332
333
#### Server-Sent Events Example
334
335
```python
336
import falcon.asgi
337
import asyncio
338
from falcon.asgi import SSEvent
339
340
class EventStreamResource:
341
async def on_get(self, req, resp):
342
# Set SSE headers
343
resp.content_type = 'text/event-stream'
344
resp.set_header('Cache-Control', 'no-cache')
345
resp.set_header('Connection', 'keep-alive')
346
347
# Create event stream
348
async def event_stream():
349
counter = 0
350
while True:
351
# Create event
352
event = SSEvent(
353
data=f'{{"count": {counter}, "timestamp": "{time.time()}"}}',
354
event_type='counter',
355
event_id=str(counter)
356
)
357
358
yield event.serialize().encode('utf-8')
359
counter += 1
360
await asyncio.sleep(1)
361
362
resp.stream = event_stream()
363
364
class NotificationStream:
365
def __init__(self):
366
self.subscribers = set()
367
368
async def on_get(self, req, resp):
369
resp.content_type = 'text/event-stream'
370
resp.set_header('Cache-Control', 'no-cache')
371
372
# Create subscriber queue
373
queue = asyncio.Queue()
374
self.subscribers.add(queue)
375
376
try:
377
async def stream():
378
while True:
379
event_data = await queue.get()
380
event = SSEvent(
381
data=json.dumps(event_data),
382
event_type='notification'
383
)
384
yield event.serialize().encode('utf-8')
385
386
resp.stream = stream()
387
finally:
388
self.subscribers.discard(queue)
389
390
async def broadcast_notification(self, data):
391
"""Send notification to all subscribers"""
392
for queue in self.subscribers:
393
try:
394
queue.put_nowait(data)
395
except asyncio.QueueFull:
396
pass # Skip if queue is full
397
398
# Register SSE routes
399
app.add_route('/events', EventStreamResource())
400
app.add_route('/notifications', NotificationStream())
401
```
402
403
### ASGI Middleware
404
405
Middleware support for ASGI applications with async processing.
406
407
```python { .api }
408
# ASGI middleware class structure
409
class ASGIMiddleware:
410
def __init__(self, other_args):
411
"""Initialize middleware with configuration"""
412
413
async def process_request(self, req: Request, resp: Response):
414
"""
415
Process request before routing.
416
417
Args:
418
req: Request object
419
resp: Response object
420
"""
421
422
async def process_response(self, req: Request, resp: Response, resource: object, req_succeeded: bool):
423
"""
424
Process response after routing.
425
426
Args:
427
req: Request object
428
resp: Response object
429
resource: Resource that handled request
430
req_succeeded: Whether request succeeded
431
"""
432
433
async def process_resource(self, req: Request, resp: Response, resource: object, params: dict):
434
"""
435
Process resource before calling responder.
436
437
Args:
438
req: Request object
439
resp: Response object
440
resource: Target resource
441
params: Route parameters
442
"""
443
```
444
445
#### ASGI Middleware Example
446
447
```python
448
class AsyncLoggingMiddleware:
449
async def process_request(self, req, resp):
450
req.context.start_time = time.time()
451
print(f"Request: {req.method} {req.path}")
452
453
async def process_response(self, req, resp, resource, req_succeeded):
454
duration = time.time() - req.context.start_time
455
print(f"Response: {resp.status} ({duration:.3f}s)")
456
457
class AsyncAuthMiddleware:
458
async def process_request(self, req, resp):
459
token = req.get_header('Authorization')
460
if not token:
461
raise falcon.HTTPUnauthorized(title='Authentication required')
462
463
# Async token validation
464
user = await validate_token_async(token)
465
if not user:
466
raise falcon.HTTPUnauthorized(title='Invalid token')
467
468
req.context.user = user
469
470
# Add middleware to ASGI app
471
app = falcon.asgi.App(middleware=[
472
AsyncLoggingMiddleware(),
473
AsyncAuthMiddleware()
474
])
475
```
476
477
## Types
478
479
```python { .api }
480
# ASGI application type
481
falcon.asgi.App: type # ASGI application class
482
483
# WebSocket types
484
WebSocket: type # WebSocket connection handler
485
WebSocketOptions: type # WebSocket configuration
486
487
# Server-Sent Events types
488
SSEvent: type # Server-sent event
489
490
# WebSocket payload type constants
491
class WebSocketPayloadType:
492
TEXT: int # Text payload type
493
BINARY: int # Binary payload type
494
495
# WebSocket exceptions
496
WebSocketDisconnected: type # Connection disconnected
497
WebSocketPathNotFound: type # WebSocket path not found
498
WebSocketHandlerNotFound: type # Handler not found
499
WebSocketServerError: type # Server error
500
```