A microservices framework for Python that lets service developers concentrate on application logic and encourages testability
—
HTTP server integration providing REST API endpoints with request/response handling, URL routing, WebSocket support, and integration with external web frameworks.
Decorator that exposes service methods as HTTP endpoints with flexible routing, method handling, and middleware support.
def http(method, url, **kwargs):
"""
Decorator to expose a service method as an HTTP endpoint.
Parameters:
- method: HTTP method ('GET', 'POST', 'PUT', 'DELETE', etc.)
- url: URL pattern with optional path parameters
- expected_exceptions: Tuple of exceptions to convert to HTTP errors
- cors: CORS configuration dictionary
Returns:
Decorated method that handles HTTP requests
"""Usage Example:
from nameko.web.handlers import http
from nameko.rpc import RpcProxy
import json
class UserAPIService:
name = "user_api"
user_rpc = RpcProxy('user_service')
@http('GET', '/users/<int:user_id>')
def get_user(self, request, user_id):
"""Get user by ID"""
try:
user = self.user_rpc.get_user(user_id)
return json.dumps(user)
except Exception as e:
return {'error': str(e)}, 404
@http('POST', '/users')
def create_user(self, request):
"""Create a new user"""
user_data = json.loads(request.get_data())
# Validate required fields
if not user_data.get('email'):
return {'error': 'Email is required'}, 400
user = self.user_rpc.create_user(user_data)
return json.dumps(user), 201
@http('PUT', '/users/<int:user_id>')
def update_user(self, request, user_id):
"""Update existing user"""
user_data = json.loads(request.get_data())
user = self.user_rpc.update_user(user_id, user_data)
return json.dumps(user)
@http('DELETE', '/users/<int:user_id>')
def delete_user(self, request, user_id):
"""Delete user"""
self.user_rpc.delete_user(user_id)
return '', 204HTTP handlers receive a request object containing all HTTP request information.
# Request object attributes and methods
class Request:
"""
HTTP request object passed to HTTP handlers.
Attributes:
- method: HTTP method (GET, POST, etc.)
- url: Request URL
- headers: Request headers dictionary
- args: Query string parameters
- form: Form data for POST requests
- json: Parsed JSON data
- files: Uploaded files
- cookies: Request cookies
"""
def get_data(self, as_text=False):
"""Get raw request body data"""
def get_json(self):
"""Parse request body as JSON"""Request Handling Example:
@http('POST', '/api/upload')
def handle_upload(self, request):
# Access different request components
content_type = request.headers.get('Content-Type')
query_params = request.args
if content_type == 'application/json':
data = request.get_json()
return self._process_json(data)
elif 'multipart/form-data' in content_type:
files = request.files
form_data = request.form
return self._process_upload(files, form_data)
else:
raw_data = request.get_data(as_text=True)
return self._process_raw(raw_data)HTTP handlers can return various response formats with status codes and headers.
Response Format Options:
@http('GET', '/api/data')
def get_data(self, request):
# Return string
return "Hello World"
# Return JSON (auto-serialized)
return {'message': 'Hello World'}
# Return with status code
return {'error': 'Not found'}, 404
# Return with status code and headers
return (
{'data': 'value'},
200,
{'Content-Type': 'application/json', 'X-Custom': 'header'}
)
# Return Response object for full control
from werkzeug.wrappers import Response
return Response(
json.dumps({'data': 'value'}),
status=200,
headers={'Content-Type': 'application/json'}
)Flexible URL routing with path parameters, query parameters, and pattern matching.
class APIService:
name = "api_service"
# Path parameters with type conversion
@http('GET', '/users/<int:user_id>')
def get_user(self, request, user_id):
# user_id is automatically converted to int
pass
# Multiple path parameters
@http('GET', '/users/<int:user_id>/posts/<int:post_id>')
def get_user_post(self, request, user_id, post_id):
pass
# String parameters (default)
@http('GET', '/categories/<category_name>')
def get_category(self, request, category_name):
pass
# UUID parameters
@http('GET', '/orders/<uuid:order_id>')
def get_order(self, request, order_id):
pass
# Query parameters
@http('GET', '/search')
def search(self, request):
query = request.args.get('q', '')
limit = int(request.args.get('limit', 10))
offset = int(request.args.get('offset', 0))
return self._search(query, limit, offset)Cross-Origin Resource Sharing (CORS) configuration for browser-based applications.
@http('GET', '/api/data', cors={
'origins': ['http://localhost:3000', 'https://myapp.com'],
'methods': ['GET', 'POST', 'PUT', 'DELETE'],
'headers': ['Content-Type', 'Authorization'],
'credentials': True
})
def api_endpoint(self, request):
"""Endpoint with CORS configuration"""WebSocket RPC endpoints for real-time bidirectional communication with connection management and room-based messaging.
from nameko.web.websocket import rpc as websocket_rpc, WebSocketHubProvider
@websocket_rpc
def websocket_method(socket_id, *args, **kwargs):
"""
Decorator for WebSocket RPC methods.
Parameters:
- socket_id: Unique identifier for the WebSocket connection
- *args, **kwargs: Method arguments
Returns:
Method result sent back to the WebSocket client
"""
class WebSocketHubProvider:
"""
Dependency provider for WebSocket hub functionality.
Manages WebSocket connections, rooms, and messaging.
"""
class WebSocketHub:
"""
Hub for managing WebSocket connections and broadcasting messages.
"""
def subscribe(self, socket_id, channel):
"""
Subscribe a WebSocket connection to a channel.
Parameters:
- socket_id: WebSocket connection identifier
- channel: Channel name to subscribe to
"""
def unsubscribe(self, socket_id, channel):
"""
Unsubscribe a WebSocket connection from a channel.
Parameters:
- socket_id: WebSocket connection identifier
- channel: Channel name to unsubscribe from
"""
def broadcast(self, channel, event, data):
"""
Broadcast a message to all connections subscribed to a channel.
Parameters:
- channel: Channel name to broadcast to
- event: Event name/type
- data: Message data to send
"""
def unicast(self, socket_id, event, data):
"""
Send a message to a specific WebSocket connection.
Parameters:
- socket_id: Target WebSocket connection identifier
- event: Event name/type
- data: Message data to send
"""
def get_subscriptions(self, socket_id):
"""
Get list of channels a WebSocket connection is subscribed to.
Parameters:
- socket_id: WebSocket connection identifier
Returns:
List of channel names
"""Usage Example:
from nameko.web.websocket import rpc as websocket_rpc, WebSocketHubProvider
from nameko.exceptions import ConnectionNotFound
class ChatService:
name = "chat_service"
websocket_hub = WebSocketHubProvider()
@websocket_rpc
def join_room(self, socket_id, room_id):
"""Subscribe socket to a chat room"""
self.websocket_hub.subscribe(socket_id, f'room:{room_id}')
# Notify other room members
self.websocket_hub.broadcast(f'room:{room_id}', 'user_joined', {
'socket_id': socket_id,
'room_id': room_id,
'timestamp': time.time()
})
return {'status': 'joined', 'room_id': room_id}
@websocket_rpc
def leave_room(self, socket_id, room_id):
"""Unsubscribe socket from a chat room"""
self.websocket_hub.unsubscribe(socket_id, f'room:{room_id}')
# Notify other room members
self.websocket_hub.broadcast(f'room:{room_id}', 'user_left', {
'socket_id': socket_id,
'room_id': room_id,
'timestamp': time.time()
})
return {'status': 'left', 'room_id': room_id}
@websocket_rpc
def send_message(self, socket_id, room_id, message):
"""Send message to all members of a chat room"""
try:
# Broadcast message to room
self.websocket_hub.broadcast(f'room:{room_id}', 'new_message', {
'message': message,
'sender': socket_id,
'room_id': room_id,
'timestamp': time.time()
})
return {'status': 'sent'}
except ConnectionNotFound:
return {'status': 'error', 'message': 'Socket not found'}
@websocket_rpc
def private_message(self, sender_id, target_id, message):
"""Send private message to specific user"""
try:
# Send direct message to target socket
self.websocket_hub.unicast(target_id, 'private_message', {
'message': message,
'sender': sender_id,
'timestamp': time.time()
})
return {'status': 'sent'}
except ConnectionNotFound:
return {'status': 'error', 'message': 'Target user not connected'}
@websocket_rpc
def get_my_rooms(self, socket_id):
"""Get list of rooms the socket is subscribed to"""
subscriptions = self.websocket_hub.get_subscriptions(socket_id)
rooms = [sub.replace('room:', '') for sub in subscriptions if sub.startswith('room:')]
return {'rooms': rooms}WebSocket Exceptions:
class ConnectionNotFound(NamekoException):
"""
Raised when attempting to send message to unknown WebSocket connection.
Parameters:
- socket_id: The unknown socket identifier
"""HTTP-specific error handling and exception mapping.
from nameko.exceptions import BadRequest, NotFound, Forbidden
class APIErrorHandler:
@http('POST', '/api/users', expected_exceptions=(BadRequest, NotFound))
def create_user_endpoint(self, request):
try:
data = request.get_json()
if not data.get('email'):
raise BadRequest("Email is required")
user = self.user_service.get_user_by_email(data['email'])
if user:
raise BadRequest("User already exists")
return self.user_service.create_user(data)
except BadRequest as e:
return {'error': str(e)}, 400
except NotFound as e:
return {'error': str(e)}, 404
except Exception as e:
return {'error': 'Internal server error'}, 500HTTP services support middleware for request preprocessing, authentication, and response modification.
from nameko.web.handlers import http
class AuthenticatedAPIService:
name = "authenticated_api"
def _authenticate_request(self, request):
"""Middleware-style authentication"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return None, ('Unauthorized', 401)
# Validate token logic
user = self._validate_token(token)
if not user:
return None, ('Invalid token', 401)
return user, None
@http('GET', '/protected-data')
def get_protected_data(self, request):
user, error = self._authenticate_request(request)
if error:
return error
# Process authenticated request
return {'data': 'secret', 'user_id': user['id']}HTTP service configuration options for server behavior, timeouts, and performance.
# config.yaml
WEB_SERVER_ADDRESS: "0.0.0.0:8000"
WEB_SERVER_MAX_WORKERS: 10
WEB_SERVER_TIMEOUT: 30
# WebSocket configuration
WEBSOCKET_HUB_CONFIG:
max_connections: 1000
heartbeat_interval: 30Install with Tessl CLI
npx tessl i tessl/pypi-nameko