The ultra-reliable, fast ASGI+WSGI framework for building data plane APIs at scale.
—
Modern asynchronous application support with WebSocket handling, Server-Sent Events, and full ASGI 3.0 compatibility for building real-time web applications.
Asynchronous application support for modern web development with WebSocket capabilities.
class falcon.asgi.App:
def __init__(
self,
media_type: str = 'application/json',
request_type: type = None,
response_type: type = None,
middleware: list = None,
router: object = None,
cors_enable: bool = False,
req_options: RequestOptions = None,
resp_options: ResponseOptions = None,
secure_cookies_by_default: bool = None
):
"""
Create an ASGI application.
Args: Same as WSGI App constructor
"""
async def __call__(self, scope: dict, receive: callable, send: callable):
"""
ASGI 3.0 callable interface.
Args:
scope: ASGI scope dictionary
receive: ASGI receive callable
send: ASGI send callable
"""
# Same methods as WSGI App:
# add_route, add_static_route, add_sink, add_middleware,
# add_error_handler, set_error_serializerimport falcon.asgi
import asyncio
class AsyncUserResource:
async def on_get(self, req, resp, user_id=None):
if user_id:
# Async database call
user = await fetch_user_async(user_id)
resp.media = user
else:
users = await fetch_all_users_async()
resp.media = {'users': users}
async def on_post(self, req, resp):
user_data = req.media
new_user = await create_user_async(user_data)
resp.status = falcon.HTTP_201
resp.media = new_user
# Create ASGI app
app = falcon.asgi.App()
app.add_route('/users', AsyncUserResource())
app.add_route('/users/{user_id}', AsyncUserResource())
# Run with ASGI server
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8000)Full-featured WebSocket handling for real-time communication.
class WebSocket:
def __init__(self, scope: dict, receive: callable, send: callable):
"""
WebSocket connection handler.
Args:
scope: ASGI WebSocket scope
receive: ASGI receive callable
send: ASGI send callable
"""
# Connection state
ready_state: int # WebSocket ready state
client_address: tuple # Client IP and port
context: object # User data storage
path: str # WebSocket path
query_string: str # Query string
headers: dict # WebSocket headers
subprotocols: list # Supported subprotocols
# Connection management
async def accept(self, subprotocol: str = None, headers: dict = None):
"""
Accept WebSocket connection.
Args:
subprotocol: Selected subprotocol
headers: Additional response headers
"""
async def close(self, code: int = 1000, reason: str = None):
"""
Close WebSocket connection.
Args:
code: Close status code
reason: Close reason
"""
# Message receiving
async def receive_text(self) -> str:
"""
Receive text message.
Returns:
Text message content
Raises:
WebSocketDisconnected: If connection closed
"""
async def receive_data(self) -> bytes:
"""
Receive binary message.
Returns:
Binary message content
Raises:
WebSocketDisconnected: If connection closed
"""
async def receive_json(self) -> object:
"""
Receive JSON message.
Returns:
Parsed JSON data
Raises:
WebSocketDisconnected: If connection closed
ValueError: If JSON parsing fails
"""
async def receive_msgpack(self) -> object:
"""
Receive MessagePack message.
Returns:
Unpacked MessagePack data
Raises:
WebSocketDisconnected: If connection closed
"""
# Message sending
async def send_text(self, payload: str):
"""
Send text message.
Args:
payload: Text message to send
Raises:
WebSocketDisconnected: If connection closed
"""
async def send_data(self, payload: bytes):
"""
Send binary message.
Args:
payload: Binary data to send
Raises:
WebSocketDisconnected: If connection closed
"""
async def send_json(self, obj: object):
"""
Send JSON message.
Args:
obj: Object to serialize as JSON
Raises:
WebSocketDisconnected: If connection closed
"""
async def send_msgpack(self, obj: object):
"""
Send MessagePack message.
Args:
obj: Object to serialize as MessagePack
Raises:
WebSocketDisconnected: If connection closed
"""
class WebSocketOptions:
def __init__(
self,
media_handlers: object = None,
max_receive_queue: int = 32
):
"""
WebSocket configuration options.
Args:
media_handlers: Media handler registry
max_receive_queue: Maximum receive queue size
"""import falcon.asgi
import asyncio
import json
class ChatWebSocket:
def __init__(self):
self.connections = set()
async def on_websocket(self, req, ws):
await ws.accept()
self.connections.add(ws)
try:
while True:
message = await ws.receive_text()
data = json.loads(message)
# Broadcast to all connections
await self.broadcast({
'user': data.get('user', 'Anonymous'),
'message': data.get('message', ''),
'timestamp': time.time()
})
except falcon.WebSocketDisconnected:
pass
finally:
self.connections.discard(ws)
async def broadcast(self, data):
"""Broadcast message to all connected clients"""
if self.connections:
message = json.dumps(data)
await asyncio.gather(
*[ws.send_text(message) for ws in self.connections],
return_exceptions=True
)
class EchoWebSocket:
async def on_websocket(self, req, ws):
await ws.accept()
try:
while True:
# Handle different message types
try:
# Try to receive as JSON first
data = await ws.receive_json()
await ws.send_json({
'echo': data,
'type': 'json'
})
except ValueError:
# Fall back to text
message = await ws.receive_text()
await ws.send_text(f"Echo: {message}")
except falcon.WebSocketDisconnected:
print("Client disconnected")
# Register WebSocket routes
app = falcon.asgi.App()
app.add_route('/chat', ChatWebSocket())
app.add_route('/echo', EchoWebSocket())Server-Sent Events support for real-time data streaming.
class SSEvent:
def __init__(
self,
data: str,
event_type: str = None,
event_id: str = None,
retry: int = None
):
"""
Server-Sent Event.
Args:
data: Event data
event_type: Event type name
event_id: Event ID for client tracking
retry: Retry interval in milliseconds
"""
def serialize(self) -> str:
"""
Serialize event to SSE format.
Returns:
SSE-formatted string
"""
# Properties
data: str # Event data
event_type: str # Event type
event_id: str # Event ID
retry: int # Retry intervalimport falcon.asgi
import asyncio
from falcon.asgi import SSEvent
class EventStreamResource:
async def on_get(self, req, resp):
# Set SSE headers
resp.content_type = 'text/event-stream'
resp.set_header('Cache-Control', 'no-cache')
resp.set_header('Connection', 'keep-alive')
# Create event stream
async def event_stream():
counter = 0
while True:
# Create event
event = SSEvent(
data=f'{{"count": {counter}, "timestamp": "{time.time()}"}}',
event_type='counter',
event_id=str(counter)
)
yield event.serialize().encode('utf-8')
counter += 1
await asyncio.sleep(1)
resp.stream = event_stream()
class NotificationStream:
def __init__(self):
self.subscribers = set()
async def on_get(self, req, resp):
resp.content_type = 'text/event-stream'
resp.set_header('Cache-Control', 'no-cache')
# Create subscriber queue
queue = asyncio.Queue()
self.subscribers.add(queue)
try:
async def stream():
while True:
event_data = await queue.get()
event = SSEvent(
data=json.dumps(event_data),
event_type='notification'
)
yield event.serialize().encode('utf-8')
resp.stream = stream()
finally:
self.subscribers.discard(queue)
async def broadcast_notification(self, data):
"""Send notification to all subscribers"""
for queue in self.subscribers:
try:
queue.put_nowait(data)
except asyncio.QueueFull:
pass # Skip if queue is full
# Register SSE routes
app.add_route('/events', EventStreamResource())
app.add_route('/notifications', NotificationStream())Middleware support for ASGI applications with async processing.
# ASGI middleware class structure
class ASGIMiddleware:
def __init__(self, other_args):
"""Initialize middleware with configuration"""
async def process_request(self, req: Request, resp: Response):
"""
Process request before routing.
Args:
req: Request object
resp: Response object
"""
async def process_response(self, req: Request, resp: Response, resource: object, req_succeeded: bool):
"""
Process response after routing.
Args:
req: Request object
resp: Response object
resource: Resource that handled request
req_succeeded: Whether request succeeded
"""
async def process_resource(self, req: Request, resp: Response, resource: object, params: dict):
"""
Process resource before calling responder.
Args:
req: Request object
resp: Response object
resource: Target resource
params: Route parameters
"""class AsyncLoggingMiddleware:
async def process_request(self, req, resp):
req.context.start_time = time.time()
print(f"Request: {req.method} {req.path}")
async def process_response(self, req, resp, resource, req_succeeded):
duration = time.time() - req.context.start_time
print(f"Response: {resp.status} ({duration:.3f}s)")
class AsyncAuthMiddleware:
async def process_request(self, req, resp):
token = req.get_header('Authorization')
if not token:
raise falcon.HTTPUnauthorized(title='Authentication required')
# Async token validation
user = await validate_token_async(token)
if not user:
raise falcon.HTTPUnauthorized(title='Invalid token')
req.context.user = user
# Add middleware to ASGI app
app = falcon.asgi.App(middleware=[
AsyncLoggingMiddleware(),
AsyncAuthMiddleware()
])# ASGI application type
falcon.asgi.App: type # ASGI application class
# WebSocket types
WebSocket: type # WebSocket connection handler
WebSocketOptions: type # WebSocket configuration
# Server-Sent Events types
SSEvent: type # Server-sent event
# WebSocket payload type constants
class WebSocketPayloadType:
TEXT: int # Text payload type
BINARY: int # Binary payload type
# WebSocket exceptions
WebSocketDisconnected: type # Connection disconnected
WebSocketPathNotFound: type # WebSocket path not found
WebSocketHandlerNotFound: type # Handler not found
WebSocketServerError: type # Server errorInstall with Tessl CLI
npx tessl i tessl/pypi-falcon