CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiohttp

Async http client/server framework (asyncio)

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

websocket.mddocs/

WebSocket Support

Comprehensive WebSocket implementation for both client and server applications, enabling real-time bidirectional communication. Supports message handling, connection management, and the complete WebSocket protocol with proper connection lifecycle management.

Capabilities

Client WebSocket Connections

WebSocket client implementation for connecting to WebSocket servers and handling real-time communication.

class ClientWebSocketResponse:
    @property
    def closed(self):
        """Check if WebSocket connection is closed."""
    
    @property
    def close_code(self):
        """WebSocket close code."""
    
    @property
    def protocol(self):
        """WebSocket protocol."""
    
    async def send_str(self, data, compress=None):
        """
        Send text message.
        
        Parameters:
        - data (str): Text message to send
        - compress (int): Compression level
        """
    
    async def send_bytes(self, data, compress=None):
        """
        Send binary message.
        
        Parameters:
        - data (bytes): Binary message to send
        - compress (int): Compression level
        """
    
    async def send_json(self, data, *, dumps=None, compress=None):
        """
        Send JSON message.
        
        Parameters:
        - data: Object to serialize as JSON
        - dumps: JSON serialization function
        - compress (int): Compression level
        """
    
    async def ping(self, message=b''):
        """
        Send ping frame.
        
        Parameters:
        - message (bytes): Ping payload
        """
    
    async def pong(self, message=b''):
        """
        Send pong frame.
        
        Parameters:
        - message (bytes): Pong payload
        """
    
    async def close(self, code=None, message=b''):
        """
        Close WebSocket connection.
        
        Parameters:
        - code (int): Close code
        - message (bytes): Close message
        """
    
    async def receive(self):
        """
        Receive next message.
        
        Returns:
        WSMessage: WebSocket message
        """
    
    async def receive_str(self):
        """
        Receive text message.
        
        Returns:
        str: Text message content
        """
    
    async def receive_bytes(self):
        """
        Receive binary message.
        
        Returns:
        bytes: Binary message content
        """
    
    async def receive_json(self, *, loads=None):
        """
        Receive JSON message.
        
        Parameters:
        - loads: JSON deserialization function
        
        Returns:
        Object parsed from JSON
        """
    
    def __aiter__(self):
        """Async iterator over messages."""
    
    async def __anext__(self):
        """Get next message in async iteration."""

Server WebSocket Responses

WebSocket server implementation for handling WebSocket connections in web applications.

class WebSocketResponse:
    def __init__(
        self,
        *,
        timeout=10.0,
        receive_timeout=None,
        autoclose=True,
        autoping=True,
        heartbeat=None,
        protocols=None,
        compress=True,
        max_msg_size=4*1024*1024,
        **kwargs
    ):
        """
        Create WebSocket response.
        
        Parameters:
        - timeout (float): WebSocket timeout
        - receive_timeout (float): Message receive timeout
        - autoclose (bool): Auto-close on connection errors
        - autoping (bool): Auto-send ping frames
        - heartbeat (float): Heartbeat interval
        - protocols: Supported WebSocket protocols
        - compress (bool): Enable compression
        - max_msg_size (int): Maximum message size
        """
    
    async def prepare(self, request):
        """
        Prepare WebSocket response.
        
        Parameters:
        - request: HTTP request for WebSocket upgrade
        
        Returns:
        WebSocketReady: Preparation result
        """
    
    @property
    def closed(self):
        """Check if WebSocket is closed."""
    
    @property
    def close_code(self):
        """WebSocket close code."""
    
    @property
    def protocol(self):
        """Selected WebSocket protocol."""
    
    async def send_str(self, data, compress=None):
        """
        Send text message.
        
        Parameters:
        - data (str): Text message
        - compress (int): Compression level
        """
    
    async def send_bytes(self, data, compress=None):
        """
        Send binary message.
        
        Parameters:
        - data (bytes): Binary message
        - compress (int): Compression level
        """
    
    async def send_json(self, data, *, dumps=None, compress=None):
        """
        Send JSON message.
        
        Parameters:
        - data: Object to serialize as JSON
        - dumps: JSON serialization function
        - compress (int): Compression level
        """
    
    async def ping(self, message=b''):
        """Send ping frame."""
    
    async def pong(self, message=b''):
        """Send pong frame."""
    
    async def close(self, code=None, message=b''):
        """Close WebSocket connection."""
    
    async def receive(self):
        """Receive next message."""
    
    async def receive_str(self):
        """Receive text message."""
    
    async def receive_bytes(self):
        """Receive binary message."""
    
    async def receive_json(self, *, loads=None):
        """Receive JSON message."""
    
    def __aiter__(self):
        """Async iterator over messages."""

class WebSocketReady:
    """WebSocket preparation result enumeration."""
    
    OK = 'OK'
    ERROR = 'ERROR'

WebSocket Protocol Types

Core WebSocket protocol types and message handling.

class WSMessage:
    def __init__(self, type, data, extra):
        """
        WebSocket message.
        
        Parameters:
        - type: Message type
        - data: Message data
        - extra: Extra message information
        """
    
    @property
    def type(self):
        """Message type."""
    
    @property
    def data(self):
        """Message data."""
    
    @property
    def extra(self):
        """Extra information."""
    
    def json(self, *, loads=None):
        """Parse message data as JSON."""

class WSMsgType:
    """WebSocket message types."""
    
    CONTINUATION = 0x0
    TEXT = 0x1
    BINARY = 0x2
    CLOSE = 0x8
    PING = 0x9
    PONG = 0xa
    ERROR = 0x100

class WSCloseCode:
    """WebSocket close codes."""
    
    OK = 1000
    GOING_AWAY = 1001
    PROTOCOL_ERROR = 1002
    UNSUPPORTED_DATA = 1003
    ABNORMAL_CLOSURE = 1006
    INVALID_TEXT = 1007
    POLICY_VIOLATION = 1008
    MESSAGE_TOO_BIG = 1009
    MANDATORY_EXTENSION = 1010
    INTERNAL_ERROR = 1011
    SERVICE_RESTART = 1012
    TRY_AGAIN_LATER = 1013

class WebSocketError(Exception):
    """WebSocket protocol error."""

Usage Examples

WebSocket Client

import aiohttp
import asyncio

async def websocket_client():
    session = aiohttp.ClientSession()
    
    try:
        async with session.ws_connect('ws://localhost:8080/ws') as ws:
            # Send messages
            await ws.send_str('Hello, Server!')
            await ws.send_json({'type': 'greeting', 'message': 'Hello'})
            
            # Receive messages
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"Received text: {msg.data}")
                elif msg.type == aiohttp.WSMsgType.BINARY:
                    print(f"Received binary: {msg.data}")  
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"WebSocket error: {ws.exception()}")
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSE:
                    print("WebSocket closed")
                    break
    
    finally:
        await session.close()

asyncio.run(websocket_client())

WebSocket Server

from aiohttp import web, WSMsgType
import weakref

# Store active WebSocket connections
websockets = weakref.WeakSet()

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    # Add to active connections
    websockets.add(ws)
    
    try:
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                data = msg.json()
                
                if data['type'] == 'echo':
                    # Echo message back
                    await ws.send_json({
                        'type': 'echo_response',
                        'message': data['message']
                    })
                
                elif data['type'] == 'broadcast':
                    # Broadcast to all connections
                    message = data['message']
                    for websocket in websockets:
                        if not websocket.closed:
                            await websocket.send_json({
                                'type': 'broadcast',
                                'message': message
                            })
            
            elif msg.type == WSMsgType.ERROR:
                print(f"WebSocket error: {ws.exception()}")
                break
    
    except Exception as e:
        print(f"WebSocket handler error: {e}")
    
    finally:
        websockets.discard(ws)
    
    return ws

# Create application with WebSocket route
app = web.Application()
app.router.add_get('/ws', websocket_handler)

if __name__ == '__main__':
    web.run_app(app, host='localhost', port=8080)

Real-time Chat Server

from aiohttp import web, WSMsgType
import json
import weakref
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ChatRoom:
    def __init__(self):
        self.clients = weakref.WeakSet()
        self.users = {}  # websocket -> username mapping
    
    def add_client(self, ws, username):
        self.clients.add(ws)
        self.users[ws] = username
        logger.info(f"User {username} joined the chat")
    
    def remove_client(self, ws):
        if ws in self.users:
            username = self.users.pop(ws)
            logger.info(f"User {username} left the chat")
        self.clients.discard(ws)
    
    async def broadcast(self, message, exclude=None):
        """Broadcast message to all connected clients."""
        disconnected = []
        
        for client in self.clients:
            if client != exclude and not client.closed:
                try:
                    await client.send_json(message)
                except Exception as e:
                    logger.error(f"Error sending to client: {e}")
                    disconnected.append(client)
        
        # Clean up disconnected clients
        for client in disconnected:
            self.remove_client(client)

# Global chat room
chat_room = ChatRoom()

async def websocket_handler(request):
    ws = web.WebSocketResponse(heartbeat=30)
    await ws.prepare(request)
    
    username = None
    
    try:
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                try:
                    data = json.loads(msg.data)
                    msg_type = data.get('type')
                    
                    if msg_type == 'join':
                        username = data.get('username', 'Anonymous')
                        chat_room.add_client(ws, username)
                        
                        # Send welcome message
                        await ws.send_json({
                            'type': 'system',
                            'message': f'Welcome {username}!'
                        })
                        
                        # Notify others
                        await chat_room.broadcast({
                            'type': 'user_joined',
                            'username': username,
                            'message': f'{username} joined the chat'
                        }, exclude=ws)
                    
                    elif msg_type == 'message':
                        if username:
                            message = {
                                'type': 'message',
                                'username': username,
                                'message': data.get('message', '')
                            }
                            await chat_room.broadcast(message)
                    
                except json.JSONDecodeError:
                    await ws.send_json({
                        'type': 'error',
                        'message': 'Invalid JSON message'
                    })
            
            elif msg.type == WSMsgType.ERROR:
                logger.error(f"WebSocket error: {ws.exception()}")
                break
    
    except Exception as e:
        logger.error(f"WebSocket handler error: {e}")
    
    finally:
        if username:
            await chat_room.broadcast({
                'type': 'user_left',
                'username': username,
                'message': f'{username} left the chat'
            }, exclude=ws)
        
        chat_room.remove_client(ws)
    
    return ws

# Serve static files (chat UI)
async def index_handler(request):
    return web.Response(text="""
<!DOCTYPE html>
<html>
<head>
    <title>WebSocket Chat</title>
</head>
<body>
    <div id="messages"></div>
    <input type="text" id="messageInput" placeholder="Type a message...">
    <button onclick="sendMessage()">Send</button>
    
    <script>
        const ws = new WebSocket('ws://localhost:8080/ws');
        const messages = document.getElementById('messages');
        
        ws.onopen = function() {
            ws.send(JSON.stringify({
                type: 'join',
                username: prompt('Enter your username:') || 'Anonymous'
            }));
        };
        
        ws.onmessage = function(event) {
            const data = JSON.parse(event.data);
            const div = document.createElement('div');
            
            if (data.type === 'message') {
                div.textContent = `${data.username}: ${data.message}`;
            } else {
                div.textContent = data.message;
                div.style.fontStyle = 'italic';
            }
            
            messages.appendChild(div);
            messages.scrollTop = messages.scrollHeight;
        };
        
        function sendMessage() {
            const input = document.getElementById('messageInput');
            if (input.value.trim()) {
                ws.send(JSON.stringify({
                    type: 'message',
                    message: input.value
                }));
                input.value = '';
            }
        }
        
        document.getElementById('messageInput').addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
    </script>
</body>
</html>
    """, content_type='text/html')

# Create application
app = web.Application()
app.router.add_get('/', index_handler)
app.router.add_get('/ws', websocket_handler)

if __name__ == '__main__':
    web.run_app(app, host='localhost', port=8080)

WebSocket with Authentication

from aiohttp import web, WSMsgType
import jwt
import datetime

SECRET_KEY = 'your-secret-key'

def create_token(username):
    """Create JWT token for user."""
    payload = {
        'username': username,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm='HS256')

def verify_token(token):
    """Verify JWT token."""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        return payload.get('username')
    except jwt.ExpiredSignatureError:
        return None
    except jwt.InvalidTokenError:
        return None

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    authenticated = False
    username = None
    
    try:
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                data = msg.json()
                
                if not authenticated:
                    if data.get('type') == 'auth':
                        token = data.get('token')
                        username = verify_token(token)
                        
                        if username:
                            authenticated = True
                            await ws.send_json({
                                'type': 'auth_success',
                                'message': f'Welcome {username}!'
                            })
                        else:
                            await ws.send_json({
                                'type': 'auth_error',
                                'message': 'Invalid or expired token'
                            })
                            break
                    else:
                        await ws.send_json({
                            'type': 'error',
                            'message': 'Authentication required'
                        })
                
                else:
                    # Handle authenticated messages
                    if data.get('type') == 'ping':
                        await ws.send_json({'type': 'pong'})
                    
                    elif data.get('type') == 'message':
                        # Process authenticated message
                        await ws.send_json({
                            'type': 'echo',
                            'username': username,
                            'message': data.get('message')
                        })
            
            elif msg.type == WSMsgType.ERROR:
                print(f"WebSocket error: {ws.exception()}")
                break
    
    except Exception as e:
        print(f"WebSocket error: {e}")
    
    return ws

app = web.Application()
app.router.add_get('/ws', websocket_handler)

if __name__ == '__main__':
    web.run_app(app, host='localhost', port=8080)

Install with Tessl CLI

npx tessl i tessl/pypi-aiohttp

docs

client.md

data.md

index.md

server.md

testing.md

utilities.md

websocket.md

tile.json