A Python ASGI web framework with the same API as Flask
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Native async WebSocket connection handling with JSON message support, connection lifecycle management, and comprehensive WebSocket protocol features.
Represents WebSocket connections with async support for bidirectional communication and connection lifecycle management.
class Websocket:
# Connection attributes
path: str # WebSocket URL path
query_string: bytes # Raw query string
args: ImmutableMultiDict # Parsed query string arguments
headers: Headers # Connection headers
requested_subprotocols: list[str] # Client requested subprotocols
async def accept(
self,
headers: dict | Headers | None = None,
subprotocol: str | None = None
):
"""
Accept WebSocket connection.
Args:
headers: Additional headers to send with acceptance
subprotocol: Selected subprotocol from client's requested list
"""
async def receive(self) -> str | bytes:
"""
Receive data from client.
Returns:
Message data as string or bytes
Raises:
ConnectionClosed: If connection is closed
"""
async def send(self, data: str | bytes):
"""
Send data to client.
Args:
data: Message data to send (string or bytes)
Raises:
ConnectionClosed: If connection is closed
"""
async def receive_json(self):
"""
Receive JSON data from client.
Returns:
Parsed JSON data
Raises:
ConnectionClosed: If connection is closed
BadRequest: If data is not valid JSON
"""
async def send_json(self, *args, **kwargs):
"""
Send JSON data to client.
Args:
*args: Data to serialize as JSON (single positional arg or multiple args)
**kwargs: Additional arguments for json.dumps() or dict data if no args
Raises:
ConnectionClosed: If connection is closed
"""
async def close(self, code: int, reason: str = ""):
"""
Close WebSocket connection.
Args:
code: WebSocket close code (required, e.g. 1000 = normal closure)
reason: Close reason message
"""from quart import Quart, websocket
app = Quart(__name__)
@app.websocket('/ws')
async def ws():
# Accept the connection
await websocket.accept()
try:
while True:
# Receive message from client
message = await websocket.receive()
# Echo message back to client
await websocket.send(f"Echo: {message}")
except ConnectionClosed:
print("WebSocket connection closed")from quart import Quart, websocket
app = Quart(__name__)
@app.websocket('/api/ws')
async def api_websocket():
await websocket.accept()
try:
while True:
# Receive JSON data
data = await websocket.receive_json()
# Process the message
response = {
'type': 'response',
'original': data,
'timestamp': time.time()
}
# Send JSON response
await websocket.send_json(response)
except ConnectionClosed:
print("Client disconnected")
except BadRequest as e:
# Handle invalid JSON
await websocket.send_json({
'type': 'error',
'message': 'Invalid JSON format'
})
await websocket.close(code=1003, reason="Invalid data format")from quart import Quart, websocket, abort
app = Quart(__name__)
@app.websocket('/secure/ws')
async def secure_websocket():
# Check authentication before accepting
auth_token = websocket.args.get('token')
if not auth_token or not validate_token(auth_token):
await websocket.close(code=1008, reason="Authentication required")
return
# Accept authenticated connection
await websocket.accept()
try:
while True:
message = await websocket.receive_json()
# Handle authenticated message
response = await process_authenticated_message(message, auth_token)
await websocket.send_json(response)
except ConnectionClosed:
print(f"Authenticated client disconnected: {auth_token}")
def validate_token(token):
# Implement token validation logic
return token == "valid_token"
async def process_authenticated_message(message, token):
# Process message with authentication context
return {
'status': 'processed',
'data': message,
'user': get_user_from_token(token)
}from quart import Quart, websocket
import asyncio
app = Quart(__name__)
# Store active connections
connections = set()
@app.websocket('/chat')
async def chat_websocket():
await websocket.accept()
connections.add(websocket)
try:
# Send welcome message
await websocket.send_json({
'type': 'system',
'message': f'Connected! {len(connections)} users online.'
})
# Broadcast new user joined
await broadcast_message({
'type': 'user_joined',
'count': len(connections)
}, exclude=websocket)
while True:
# Receive message from this client
data = await websocket.receive_json()
# Broadcast to all other clients
broadcast_data = {
'type': 'message',
'user': data.get('user', 'Anonymous'),
'message': data.get('message', ''),
'timestamp': time.time()
}
await broadcast_message(broadcast_data, exclude=websocket)
except ConnectionClosed:
pass
finally:
# Remove connection and notify others
connections.discard(websocket)
await broadcast_message({
'type': 'user_left',
'count': len(connections)
})
async def broadcast_message(data, exclude=None):
"""Broadcast message to all connected clients."""
if not connections:
return
# Create list of send tasks
tasks = []
for conn in connections.copy(): # Copy to avoid modification during iteration
if conn != exclude:
tasks.append(send_safe(conn, data))
# Send to all connections concurrently
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def send_safe(ws, data):
"""Send data to websocket, handling connection errors."""
try:
await ws.send_json(data)
except ConnectionClosed:
# Remove broken connection
connections.discard(ws)from quart import Quart, websocket
import asyncio
app = Quart(__name__)
@app.websocket('/ws/heartbeat')
async def heartbeat_websocket():
await websocket.accept()
# Start heartbeat task
heartbeat_task = asyncio.create_task(heartbeat_handler())
try:
while True:
# Receive messages with timeout
try:
message = await asyncio.wait_for(
websocket.receive(),
timeout=30.0 # 30 second timeout
)
# Handle regular messages
if message == 'ping':
await websocket.send('pong')
else:
# Process other messages
await websocket.send(f"Received: {message}")
except asyncio.TimeoutError:
# Send ping to check if client is still alive
await websocket.send('ping')
except ConnectionClosed:
print("Client disconnected")
finally:
# Cancel heartbeat task
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
async def heartbeat_handler():
"""Send periodic heartbeat messages."""
try:
while True:
await asyncio.sleep(20) # Send heartbeat every 20 seconds
await websocket.send_json({
'type': 'heartbeat',
'timestamp': time.time()
})
except (ConnectionClosed, asyncio.CancelledError):
passfrom quart import Quart, websocket
from quart.exceptions import BadRequest, ConnectionClosed
app = Quart(__name__)
@app.websocket('/ws/robust')
async def robust_websocket():
try:
await websocket.accept()
while True:
try:
# Receive and validate message
data = await websocket.receive_json()
# Validate message structure
if not isinstance(data, dict) or 'type' not in data:
await websocket.send_json({
'type': 'error',
'code': 'INVALID_FORMAT',
'message': 'Message must be JSON object with "type" field'
})
continue
# Process different message types
if data['type'] == 'echo':
await websocket.send_json({
'type': 'echo_response',
'original': data.get('message', '')
})
elif data['type'] == 'time':
await websocket.send_json({
'type': 'time_response',
'timestamp': time.time()
})
else:
await websocket.send_json({
'type': 'error',
'code': 'UNKNOWN_TYPE',
'message': f'Unknown message type: {data["type"]}'
})
except BadRequest:
# Invalid JSON received
await websocket.send_json({
'type': 'error',
'code': 'INVALID_JSON',
'message': 'Invalid JSON format'
})
except Exception as e:
# Handle unexpected errors
await websocket.send_json({
'type': 'error',
'code': 'INTERNAL_ERROR',
'message': 'An internal error occurred'
})
print(f"WebSocket error: {e}")
except ConnectionClosed:
print("WebSocket connection closed normally")
except Exception as e:
print(f"WebSocket handler error: {e}")
try:
await websocket.close(code=1011, reason="Internal server error")
except:
pass # Connection might already be closedInstall with Tessl CLI
npx tessl i tessl/pypi-quart