Async http client/server framework (asyncio)
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive WebSocket implementation for both client and server applications, enabling real-time bidirectional communication. Supports message handling, connection management, and the complete WebSocket protocol with proper connection lifecycle management.
WebSocket client implementation for connecting to WebSocket servers and handling real-time communication.
class ClientWebSocketResponse:
@property
def closed(self):
"""Check if WebSocket connection is closed."""
@property
def close_code(self):
"""WebSocket close code."""
@property
def protocol(self):
"""WebSocket protocol."""
async def send_str(self, data, compress=None):
"""
Send text message.
Parameters:
- data (str): Text message to send
- compress (int): Compression level
"""
async def send_bytes(self, data, compress=None):
"""
Send binary message.
Parameters:
- data (bytes): Binary message to send
- compress (int): Compression level
"""
async def send_json(self, data, *, dumps=None, compress=None):
"""
Send JSON message.
Parameters:
- data: Object to serialize as JSON
- dumps: JSON serialization function
- compress (int): Compression level
"""
async def ping(self, message=b''):
"""
Send ping frame.
Parameters:
- message (bytes): Ping payload
"""
async def pong(self, message=b''):
"""
Send pong frame.
Parameters:
- message (bytes): Pong payload
"""
async def close(self, code=None, message=b''):
"""
Close WebSocket connection.
Parameters:
- code (int): Close code
- message (bytes): Close message
"""
async def receive(self):
"""
Receive next message.
Returns:
WSMessage: WebSocket message
"""
async def receive_str(self):
"""
Receive text message.
Returns:
str: Text message content
"""
async def receive_bytes(self):
"""
Receive binary message.
Returns:
bytes: Binary message content
"""
async def receive_json(self, *, loads=None):
"""
Receive JSON message.
Parameters:
- loads: JSON deserialization function
Returns:
Object parsed from JSON
"""
def __aiter__(self):
"""Async iterator over messages."""
async def __anext__(self):
"""Get next message in async iteration."""WebSocket server implementation for handling WebSocket connections in web applications.
class WebSocketResponse:
def __init__(
self,
*,
timeout=10.0,
receive_timeout=None,
autoclose=True,
autoping=True,
heartbeat=None,
protocols=None,
compress=True,
max_msg_size=4*1024*1024,
**kwargs
):
"""
Create WebSocket response.
Parameters:
- timeout (float): WebSocket timeout
- receive_timeout (float): Message receive timeout
- autoclose (bool): Auto-close on connection errors
- autoping (bool): Auto-send ping frames
- heartbeat (float): Heartbeat interval
- protocols: Supported WebSocket protocols
- compress (bool): Enable compression
- max_msg_size (int): Maximum message size
"""
async def prepare(self, request):
"""
Prepare WebSocket response.
Parameters:
- request: HTTP request for WebSocket upgrade
Returns:
WebSocketReady: Preparation result
"""
@property
def closed(self):
"""Check if WebSocket is closed."""
@property
def close_code(self):
"""WebSocket close code."""
@property
def protocol(self):
"""Selected WebSocket protocol."""
async def send_str(self, data, compress=None):
"""
Send text message.
Parameters:
- data (str): Text message
- compress (int): Compression level
"""
async def send_bytes(self, data, compress=None):
"""
Send binary message.
Parameters:
- data (bytes): Binary message
- compress (int): Compression level
"""
async def send_json(self, data, *, dumps=None, compress=None):
"""
Send JSON message.
Parameters:
- data: Object to serialize as JSON
- dumps: JSON serialization function
- compress (int): Compression level
"""
async def ping(self, message=b''):
"""Send ping frame."""
async def pong(self, message=b''):
"""Send pong frame."""
async def close(self, code=None, message=b''):
"""Close WebSocket connection."""
async def receive(self):
"""Receive next message."""
async def receive_str(self):
"""Receive text message."""
async def receive_bytes(self):
"""Receive binary message."""
async def receive_json(self, *, loads=None):
"""Receive JSON message."""
def __aiter__(self):
"""Async iterator over messages."""
class WebSocketReady:
"""WebSocket preparation result enumeration."""
OK = 'OK'
ERROR = 'ERROR'Core WebSocket protocol types and message handling.
class WSMessage:
def __init__(self, type, data, extra):
"""
WebSocket message.
Parameters:
- type: Message type
- data: Message data
- extra: Extra message information
"""
@property
def type(self):
"""Message type."""
@property
def data(self):
"""Message data."""
@property
def extra(self):
"""Extra information."""
def json(self, *, loads=None):
"""Parse message data as JSON."""
class WSMsgType:
"""WebSocket message types."""
CONTINUATION = 0x0
TEXT = 0x1
BINARY = 0x2
CLOSE = 0x8
PING = 0x9
PONG = 0xa
ERROR = 0x100
class WSCloseCode:
"""WebSocket close codes."""
OK = 1000
GOING_AWAY = 1001
PROTOCOL_ERROR = 1002
UNSUPPORTED_DATA = 1003
ABNORMAL_CLOSURE = 1006
INVALID_TEXT = 1007
POLICY_VIOLATION = 1008
MESSAGE_TOO_BIG = 1009
MANDATORY_EXTENSION = 1010
INTERNAL_ERROR = 1011
SERVICE_RESTART = 1012
TRY_AGAIN_LATER = 1013
class WebSocketError(Exception):
"""WebSocket protocol error."""import aiohttp
import asyncio
async def websocket_client():
session = aiohttp.ClientSession()
try:
async with session.ws_connect('ws://localhost:8080/ws') as ws:
# Send messages
await ws.send_str('Hello, Server!')
await ws.send_json({'type': 'greeting', 'message': 'Hello'})
# Receive messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"Received text: {msg.data}")
elif msg.type == aiohttp.WSMsgType.BINARY:
print(f"Received binary: {msg.data}")
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
print("WebSocket closed")
break
finally:
await session.close()
asyncio.run(websocket_client())from aiohttp import web, WSMsgType
import weakref
# Store active WebSocket connections
websockets = weakref.WeakSet()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Add to active connections
websockets.add(ws)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
data = msg.json()
if data['type'] == 'echo':
# Echo message back
await ws.send_json({
'type': 'echo_response',
'message': data['message']
})
elif data['type'] == 'broadcast':
# Broadcast to all connections
message = data['message']
for websocket in websockets:
if not websocket.closed:
await websocket.send_json({
'type': 'broadcast',
'message': message
})
elif msg.type == WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
except Exception as e:
print(f"WebSocket handler error: {e}")
finally:
websockets.discard(ws)
return ws
# Create application with WebSocket route
app = web.Application()
app.router.add_get('/ws', websocket_handler)
if __name__ == '__main__':
web.run_app(app, host='localhost', port=8080)from aiohttp import web, WSMsgType
import json
import weakref
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ChatRoom:
def __init__(self):
self.clients = weakref.WeakSet()
self.users = {} # websocket -> username mapping
def add_client(self, ws, username):
self.clients.add(ws)
self.users[ws] = username
logger.info(f"User {username} joined the chat")
def remove_client(self, ws):
if ws in self.users:
username = self.users.pop(ws)
logger.info(f"User {username} left the chat")
self.clients.discard(ws)
async def broadcast(self, message, exclude=None):
"""Broadcast message to all connected clients."""
disconnected = []
for client in self.clients:
if client != exclude and not client.closed:
try:
await client.send_json(message)
except Exception as e:
logger.error(f"Error sending to client: {e}")
disconnected.append(client)
# Clean up disconnected clients
for client in disconnected:
self.remove_client(client)
# Global chat room
chat_room = ChatRoom()
async def websocket_handler(request):
ws = web.WebSocketResponse(heartbeat=30)
await ws.prepare(request)
username = None
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
try:
data = json.loads(msg.data)
msg_type = data.get('type')
if msg_type == 'join':
username = data.get('username', 'Anonymous')
chat_room.add_client(ws, username)
# Send welcome message
await ws.send_json({
'type': 'system',
'message': f'Welcome {username}!'
})
# Notify others
await chat_room.broadcast({
'type': 'user_joined',
'username': username,
'message': f'{username} joined the chat'
}, exclude=ws)
elif msg_type == 'message':
if username:
message = {
'type': 'message',
'username': username,
'message': data.get('message', '')
}
await chat_room.broadcast(message)
except json.JSONDecodeError:
await ws.send_json({
'type': 'error',
'message': 'Invalid JSON message'
})
elif msg.type == WSMsgType.ERROR:
logger.error(f"WebSocket error: {ws.exception()}")
break
except Exception as e:
logger.error(f"WebSocket handler error: {e}")
finally:
if username:
await chat_room.broadcast({
'type': 'user_left',
'username': username,
'message': f'{username} left the chat'
}, exclude=ws)
chat_room.remove_client(ws)
return ws
# Serve static files (chat UI)
async def index_handler(request):
return web.Response(text="""
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Chat</title>
</head>
<body>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="Type a message...">
<button onclick="sendMessage()">Send</button>
<script>
const ws = new WebSocket('ws://localhost:8080/ws');
const messages = document.getElementById('messages');
ws.onopen = function() {
ws.send(JSON.stringify({
type: 'join',
username: prompt('Enter your username:') || 'Anonymous'
}));
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
const div = document.createElement('div');
if (data.type === 'message') {
div.textContent = `${data.username}: ${data.message}`;
} else {
div.textContent = data.message;
div.style.fontStyle = 'italic';
}
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
};
function sendMessage() {
const input = document.getElementById('messageInput');
if (input.value.trim()) {
ws.send(JSON.stringify({
type: 'message',
message: input.value
}));
input.value = '';
}
}
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
""", content_type='text/html')
# Create application
app = web.Application()
app.router.add_get('/', index_handler)
app.router.add_get('/ws', websocket_handler)
if __name__ == '__main__':
web.run_app(app, host='localhost', port=8080)from aiohttp import web, WSMsgType
import jwt
import datetime
SECRET_KEY = 'your-secret-key'
def create_token(username):
"""Create JWT token for user."""
payload = {
'username': username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
def verify_token(token):
"""Verify JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
return payload.get('username')
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
authenticated = False
username = None
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
data = msg.json()
if not authenticated:
if data.get('type') == 'auth':
token = data.get('token')
username = verify_token(token)
if username:
authenticated = True
await ws.send_json({
'type': 'auth_success',
'message': f'Welcome {username}!'
})
else:
await ws.send_json({
'type': 'auth_error',
'message': 'Invalid or expired token'
})
break
else:
await ws.send_json({
'type': 'error',
'message': 'Authentication required'
})
else:
# Handle authenticated messages
if data.get('type') == 'ping':
await ws.send_json({'type': 'pong'})
elif data.get('type') == 'message':
# Process authenticated message
await ws.send_json({
'type': 'echo',
'username': username,
'message': data.get('message')
})
elif msg.type == WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
except Exception as e:
print(f"WebSocket error: {e}")
return ws
app = web.Application()
app.router.add_get('/ws', websocket_handler)
if __name__ == '__main__':
web.run_app(app, host='localhost', port=8080)Install with Tessl CLI
npx tessl i tessl/pypi-aiohttp