CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nameko

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability

Pending
Overview
Eval results
Files

http-interface.mddocs/

HTTP Interface

HTTP server integration providing REST API endpoints with request/response handling, URL routing, WebSocket support, and integration with external web frameworks.

Capabilities

HTTP Endpoint Decorator

Decorator that exposes service methods as HTTP endpoints with flexible routing, method handling, and middleware support.

def http(method, url, **kwargs):
    """
    Decorator to expose a service method as an HTTP endpoint.
    
    Parameters:
    - method: HTTP method ('GET', 'POST', 'PUT', 'DELETE', etc.)
    - url: URL pattern with optional path parameters
    - expected_exceptions: Tuple of exceptions to convert to HTTP errors
    - cors: CORS configuration dictionary
    
    Returns:
    Decorated method that handles HTTP requests
    """

Usage Example:

from nameko.web.handlers import http
from nameko.rpc import RpcProxy
import json

class UserAPIService:
    name = "user_api"
    
    user_rpc = RpcProxy('user_service')
    
    @http('GET', '/users/<int:user_id>')
    def get_user(self, request, user_id):
        """Get user by ID"""
        try:
            user = self.user_rpc.get_user(user_id)
            return json.dumps(user)
        except Exception as e:
            return {'error': str(e)}, 404
    
    @http('POST', '/users')
    def create_user(self, request):
        """Create a new user"""
        user_data = json.loads(request.get_data())
        
        # Validate required fields
        if not user_data.get('email'):
            return {'error': 'Email is required'}, 400
        
        user = self.user_rpc.create_user(user_data)
        return json.dumps(user), 201
    
    @http('PUT', '/users/<int:user_id>')
    def update_user(self, request, user_id):
        """Update existing user"""
        user_data = json.loads(request.get_data())
        user = self.user_rpc.update_user(user_id, user_data)
        return json.dumps(user)
    
    @http('DELETE', '/users/<int:user_id>')
    def delete_user(self, request, user_id):
        """Delete user"""
        self.user_rpc.delete_user(user_id)
        return '', 204

Request Object

HTTP handlers receive a request object containing all HTTP request information.

# Request object attributes and methods
class Request:
    """
    HTTP request object passed to HTTP handlers.
    
    Attributes:
    - method: HTTP method (GET, POST, etc.)
    - url: Request URL
    - headers: Request headers dictionary
    - args: Query string parameters
    - form: Form data for POST requests
    - json: Parsed JSON data
    - files: Uploaded files
    - cookies: Request cookies
    """
    
    def get_data(self, as_text=False):
        """Get raw request body data"""
    
    def get_json(self):
        """Parse request body as JSON"""

Request Handling Example:

@http('POST', '/api/upload')
def handle_upload(self, request):
    # Access different request components
    content_type = request.headers.get('Content-Type')
    query_params = request.args
    
    if content_type == 'application/json':
        data = request.get_json()
        return self._process_json(data)
    elif 'multipart/form-data' in content_type:
        files = request.files
        form_data = request.form
        return self._process_upload(files, form_data)
    else:
        raw_data = request.get_data(as_text=True)
        return self._process_raw(raw_data)

Response Handling

HTTP handlers can return various response formats with status codes and headers.

Response Format Options:

@http('GET', '/api/data')
def get_data(self, request):
    # Return string
    return "Hello World"
    
    # Return JSON (auto-serialized)
    return {'message': 'Hello World'}
    
    # Return with status code
    return {'error': 'Not found'}, 404
    
    # Return with status code and headers
    return (
        {'data': 'value'}, 
        200, 
        {'Content-Type': 'application/json', 'X-Custom': 'header'}
    )
    
    # Return Response object for full control
    from werkzeug.wrappers import Response
    return Response(
        json.dumps({'data': 'value'}),
        status=200,
        headers={'Content-Type': 'application/json'}
    )

URL Routing and Parameters

Flexible URL routing with path parameters, query parameters, and pattern matching.

class APIService:
    name = "api_service"
    
    # Path parameters with type conversion
    @http('GET', '/users/<int:user_id>')
    def get_user(self, request, user_id):
        # user_id is automatically converted to int
        pass
    
    # Multiple path parameters
    @http('GET', '/users/<int:user_id>/posts/<int:post_id>')
    def get_user_post(self, request, user_id, post_id):
        pass
    
    # String parameters (default)
    @http('GET', '/categories/<category_name>')
    def get_category(self, request, category_name):
        pass
    
    # UUID parameters
    @http('GET', '/orders/<uuid:order_id>')
    def get_order(self, request, order_id):
        pass
    
    # Query parameters
    @http('GET', '/search')
    def search(self, request):
        query = request.args.get('q', '')
        limit = int(request.args.get('limit', 10))
        offset = int(request.args.get('offset', 0))
        return self._search(query, limit, offset)

CORS Support

Cross-Origin Resource Sharing (CORS) configuration for browser-based applications.

@http('GET', '/api/data', cors={
    'origins': ['http://localhost:3000', 'https://myapp.com'],
    'methods': ['GET', 'POST', 'PUT', 'DELETE'],
    'headers': ['Content-Type', 'Authorization'],
    'credentials': True
})
def api_endpoint(self, request):
    """Endpoint with CORS configuration"""

WebSocket Support

WebSocket RPC endpoints for real-time bidirectional communication with connection management and room-based messaging.

from nameko.web.websocket import rpc as websocket_rpc, WebSocketHubProvider

@websocket_rpc
def websocket_method(socket_id, *args, **kwargs):
    """
    Decorator for WebSocket RPC methods.
    
    Parameters:
    - socket_id: Unique identifier for the WebSocket connection
    - *args, **kwargs: Method arguments
    
    Returns:
    Method result sent back to the WebSocket client
    """

class WebSocketHubProvider:
    """
    Dependency provider for WebSocket hub functionality.
    Manages WebSocket connections, rooms, and messaging.
    """

class WebSocketHub:
    """
    Hub for managing WebSocket connections and broadcasting messages.
    """
    
    def subscribe(self, socket_id, channel):
        """
        Subscribe a WebSocket connection to a channel.
        
        Parameters:
        - socket_id: WebSocket connection identifier
        - channel: Channel name to subscribe to
        """
    
    def unsubscribe(self, socket_id, channel):
        """
        Unsubscribe a WebSocket connection from a channel.
        
        Parameters:
        - socket_id: WebSocket connection identifier  
        - channel: Channel name to unsubscribe from
        """
    
    def broadcast(self, channel, event, data):
        """
        Broadcast a message to all connections subscribed to a channel.
        
        Parameters:
        - channel: Channel name to broadcast to
        - event: Event name/type
        - data: Message data to send
        """
    
    def unicast(self, socket_id, event, data):
        """
        Send a message to a specific WebSocket connection.
        
        Parameters:
        - socket_id: Target WebSocket connection identifier
        - event: Event name/type
        - data: Message data to send
        """
    
    def get_subscriptions(self, socket_id):
        """
        Get list of channels a WebSocket connection is subscribed to.
        
        Parameters:
        - socket_id: WebSocket connection identifier
        
        Returns:
        List of channel names
        """

Usage Example:

from nameko.web.websocket import rpc as websocket_rpc, WebSocketHubProvider
from nameko.exceptions import ConnectionNotFound

class ChatService:
    name = "chat_service"
    
    websocket_hub = WebSocketHubProvider()
    
    @websocket_rpc
    def join_room(self, socket_id, room_id):
        """Subscribe socket to a chat room"""
        self.websocket_hub.subscribe(socket_id, f'room:{room_id}')
        
        # Notify other room members
        self.websocket_hub.broadcast(f'room:{room_id}', 'user_joined', {
            'socket_id': socket_id,
            'room_id': room_id,
            'timestamp': time.time()
        })
        
        return {'status': 'joined', 'room_id': room_id}
    
    @websocket_rpc
    def leave_room(self, socket_id, room_id):
        """Unsubscribe socket from a chat room"""
        self.websocket_hub.unsubscribe(socket_id, f'room:{room_id}')
        
        # Notify other room members
        self.websocket_hub.broadcast(f'room:{room_id}', 'user_left', {
            'socket_id': socket_id,
            'room_id': room_id,
            'timestamp': time.time()
        })
        
        return {'status': 'left', 'room_id': room_id}
    
    @websocket_rpc  
    def send_message(self, socket_id, room_id, message):
        """Send message to all members of a chat room"""
        try:
            # Broadcast message to room
            self.websocket_hub.broadcast(f'room:{room_id}', 'new_message', {
                'message': message,
                'sender': socket_id,
                'room_id': room_id,
                'timestamp': time.time()
            })
            return {'status': 'sent'}
        except ConnectionNotFound:
            return {'status': 'error', 'message': 'Socket not found'}
    
    @websocket_rpc
    def private_message(self, sender_id, target_id, message):
        """Send private message to specific user"""
        try:
            # Send direct message to target socket
            self.websocket_hub.unicast(target_id, 'private_message', {
                'message': message,
                'sender': sender_id,
                'timestamp': time.time()
            })
            return {'status': 'sent'}
        except ConnectionNotFound:
            return {'status': 'error', 'message': 'Target user not connected'}
    
    @websocket_rpc
    def get_my_rooms(self, socket_id):
        """Get list of rooms the socket is subscribed to"""
        subscriptions = self.websocket_hub.get_subscriptions(socket_id)
        rooms = [sub.replace('room:', '') for sub in subscriptions if sub.startswith('room:')]
        return {'rooms': rooms}

WebSocket Exceptions:

class ConnectionNotFound(NamekoException):
    """
    Raised when attempting to send message to unknown WebSocket connection.
    
    Parameters:
    - socket_id: The unknown socket identifier
    """

Error Handling

HTTP-specific error handling and exception mapping.

from nameko.exceptions import BadRequest, NotFound, Forbidden

class APIErrorHandler:
    
    @http('POST', '/api/users', expected_exceptions=(BadRequest, NotFound))
    def create_user_endpoint(self, request):
        try:
            data = request.get_json()
            if not data.get('email'):
                raise BadRequest("Email is required")
            
            user = self.user_service.get_user_by_email(data['email'])
            if user:
                raise BadRequest("User already exists")
            
            return self.user_service.create_user(data)
            
        except BadRequest as e:
            return {'error': str(e)}, 400
        except NotFound as e:
            return {'error': str(e)}, 404
        except Exception as e:
            return {'error': 'Internal server error'}, 500

Middleware and Request Processing

HTTP services support middleware for request preprocessing, authentication, and response modification.

from nameko.web.handlers import http

class AuthenticatedAPIService:
    name = "authenticated_api"
    
    def _authenticate_request(self, request):
        """Middleware-style authentication"""
        token = request.headers.get('Authorization', '').replace('Bearer ', '')
        if not token:
            return None, ('Unauthorized', 401)
        
        # Validate token logic
        user = self._validate_token(token)
        if not user:
            return None, ('Invalid token', 401)
        
        return user, None
    
    @http('GET', '/protected-data')
    def get_protected_data(self, request):
        user, error = self._authenticate_request(request)
        if error:
            return error
        
        # Process authenticated request
        return {'data': 'secret', 'user_id': user['id']}

Configuration

HTTP service configuration options for server behavior, timeouts, and performance.

# config.yaml
WEB_SERVER_ADDRESS: "0.0.0.0:8000"
WEB_SERVER_MAX_WORKERS: 10
WEB_SERVER_TIMEOUT: 30

# WebSocket configuration
WEBSOCKET_HUB_CONFIG:
  max_connections: 1000
  heartbeat_interval: 30

Install with Tessl CLI

npx tessl i tessl/pypi-nameko

docs

cli-interface.md

dependency-injection.md

event-system.md

http-interface.md

index.md

rpc-communication.md

service-management.md

standalone-clients.md

testing-framework.md

timer-scheduling.md

tile.json