A microservices framework for Python that lets service developers concentrate on application logic and encourages testability
—
Client libraries for interacting with nameko services from non-nameko applications, supporting both RPC calls and event publishing with connection management and error handling.
Client for making RPC calls to specific nameko services from standalone applications.
class ServiceRpcProxy:
"""
Standalone RPC client for calling a specific nameko service.
Parameters:
- service_name: Name of the target service
- config: Configuration dictionary with AMQP connection details
- context_data: Optional context data to pass with all calls
- timeout: Request timeout in seconds
"""
def __init__(self, service_name, config, context_data=None, timeout=None): ...
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...Usage Example:
from nameko.standalone.rpc import ServiceRpcProxy
# Configuration for AMQP connection
config = {
'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
}
# Using context manager (recommended)
with ServiceRpcProxy('user_service', config) as user_rpc:
# Call service methods
user = user_rpc.get_user(user_id=123)
print(f"User: {user['name']} ({user['email']})")
# Create new user
new_user = user_rpc.create_user({
'name': 'John Doe',
'email': 'john@example.com'
})
print(f"Created user with ID: {new_user['user_id']}")
# Manual connection management
user_rpc = ServiceRpcProxy('user_service', config)
try:
user = user_rpc.get_user(user_id=456)
print(user)
finally:
user_rpc.stop() # Always close connectionClient that can call multiple services with service discovery and connection pooling.
class ClusterRpcProxy:
"""
Standalone RPC client for calling multiple nameko services.
Parameters:
- config: Configuration dictionary with AMQP connection details
- context_data: Optional context data to pass with all calls
- timeout: Request timeout in seconds
"""
def __init__(self, config, context_data=None, timeout=None): ...
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
def __getattr__(self, service_name): ...Usage Example:
from nameko.standalone.rpc import ClusterRpcProxy
config = {
'AMQP_URI': 'amqp://guest:guest@localhost:5672//',
'RPC_TIMEOUT': 30 # Default timeout for all calls
}
# Using context manager for multiple services
with ClusterRpcProxy(config) as cluster_rpc:
# Call different services through dynamic attributes
user = cluster_rpc.user_service.get_user(123)
order = cluster_rpc.order_service.create_order({
'user_id': user['user_id'],
'items': [{'product_id': 1, 'quantity': 2}]
})
# Send notification about the order
cluster_rpc.notification_service.send_notification({
'user_id': user['user_id'],
'message': f'Order {order["order_id"]} created successfully'
})
# With context data for request tracing
context_data = {
'correlation_id': 'web-request-456',
'user_id': 123
}
with ClusterRpcProxy(config, context_data=context_data) as cluster_rpc:
# All calls will include the context data
result = cluster_rpc.audit_service.log_action('user_login', {
'timestamp': time.time()
})Function for publishing events to nameko services from standalone applications.
def event_dispatcher(config):
"""
Create an event dispatcher for publishing events.
Parameters:
- config: Configuration dictionary with AMQP connection details
Returns:
Event dispatcher function that can publish events
"""Usage Example:
from nameko.standalone.events import event_dispatcher
config = {
'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
}
# Create event dispatcher
dispatch_event = event_dispatcher(config)
# Publish events to services
dispatch_event('external_system', 'data_imported', {
'file_path': '/data/import/users.csv',
'record_count': 1000,
'import_id': 'import-123',
'timestamp': time.time()
})
dispatch_event('payment_gateway', 'payment_received', {
'order_id': 'order-456',
'amount': 99.99,
'currency': 'USD',
'payment_method': 'credit_card',
'transaction_id': 'txn-789'
})
# The dispatcher automatically handles connection cleanupFunction for getting event exchanges for manual event publishing with more control.
def get_event_exchange(config):
"""
Get event exchange for manual event publishing.
Parameters:
- config: Configuration dictionary with AMQP connection details
Returns:
Exchange object for publishing events with full control
"""Usage Example:
from nameko.standalone.events import get_event_exchange
import json
config = {
'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
}
# Get exchange for manual publishing
exchange = get_event_exchange(config)
try:
# Manual event publishing with full control
event_data = {
'user_id': 123,
'action': 'profile_updated',
'changes': ['email', 'phone'],
'timestamp': time.time()
}
# Publish with specific routing key and headers
exchange.publish(
message=json.dumps(event_data),
routing_key='user_service.profile_updated',
headers={
'source_service': 'web_app',
'event_type': 'profile_updated',
'correlation_id': 'web-session-789'
}
)
finally:
exchange.close()Common patterns for integrating nameko services with web frameworks.
Flask Integration Example:
from flask import Flask, request, jsonify
from nameko.standalone.rpc import ClusterRpcProxy
app = Flask(__name__)
# Nameko configuration
NAMEKO_CONFIG = {
'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
}
@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""Get user via nameko service"""
with ClusterRpcProxy(NAMEKO_CONFIG) as rpc:
try:
user = rpc.user_service.get_user(user_id)
return jsonify(user)
except Exception as e:
return jsonify({'error': str(e)}), 404
@app.route('/api/users', methods=['POST'])
def create_user():
"""Create user via nameko service"""
user_data = request.get_json()
with ClusterRpcProxy(NAMEKO_CONFIG) as rpc:
try:
user = rpc.user_service.create_user(user_data)
return jsonify(user), 201
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/orders', methods=['POST'])
def create_order():
"""Create order and send notification"""
order_data = request.get_json()
# Add correlation ID for tracing
context_data = {
'correlation_id': f'web-{request.headers.get("X-Request-ID", "unknown")}',
'source': 'web_api'
}
with ClusterRpcProxy(NAMEKO_CONFIG, context_data=context_data) as rpc:
try:
# Create order
order = rpc.order_service.create_order(order_data)
# Send notification
rpc.notification_service.send_order_confirmation({
'order_id': order['order_id'],
'user_id': order_data['user_id']
})
return jsonify(order), 201
except Exception as e:
return jsonify({'error': str(e)}), 400
if __name__ == '__main__':
app.run(debug=True)Django Integration Example:
# django_app/services.py
from django.conf import settings
from nameko.standalone.rpc import ClusterRpcProxy
from nameko.standalone.events import event_dispatcher
class NamekoService:
"""Django service layer for nameko integration"""
def __init__(self):
self.config = {
'AMQP_URI': settings.NAMEKO_AMQP_URI
}
self.dispatch_event = event_dispatcher(self.config)
def get_user_data(self, user_id):
"""Get enriched user data from nameko services"""
with ClusterRpcProxy(self.config) as rpc:
user = rpc.user_service.get_user(user_id)
preferences = rpc.preference_service.get_user_preferences(user_id)
return {
**user,
'preferences': preferences
}
def notify_user_action(self, user_id, action, data):
"""Notify nameko services of user actions"""
self.dispatch_event('django_app', f'user_{action}', {
'user_id': user_id,
'action': action,
'data': data,
'timestamp': time.time()
})
# django_app/views.py
from django.http import JsonResponse
from django.views import View
from .services import NamekoService
class UserAPIView(View):
def __init__(self):
super().__init__()
self.nameko = NamekoService()
def get(self, request, user_id):
try:
user_data = self.nameko.get_user_data(user_id)
return JsonResponse(user_data)
except Exception as e:
return JsonResponse({'error': str(e)}, status=400)
def post(self, request, user_id):
# Handle user action
action_data = json.loads(request.body)
# Notify nameko services
self.nameko.notify_user_action(
user_id,
action_data['action'],
action_data.get('data', {})
)
return JsonResponse({'status': 'success'})Robust error handling patterns for standalone clients.
from nameko.standalone.rpc import ServiceRpcProxy
from nameko.exceptions import RemoteError, ServiceNotFound
import time
import logging
class ResilientNamekoClient:
"""Wrapper for resilient nameko service calls"""
def __init__(self, config, max_retries=3, retry_delay=1):
self.config = config
self.max_retries = max_retries
self.retry_delay = retry_delay
self.logger = logging.getLogger(__name__)
def call_service(self, service_name, method_name, *args, **kwargs):
"""Call service method with automatic retries"""
for attempt in range(self.max_retries + 1):
try:
with ServiceRpcProxy(service_name, self.config) as rpc:
method = getattr(rpc, method_name)
return method(*args, **kwargs)
except ServiceNotFound:
self.logger.error(f"Service {service_name} not found")
raise # Don't retry for service not found
except RemoteError as e:
self.logger.warning(f"Remote error on attempt {attempt + 1}: {e}")
if attempt == self.max_retries:
raise
time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff
except Exception as e:
self.logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")
if attempt == self.max_retries:
raise
time.sleep(self.retry_delay)
# Usage
client = ResilientNamekoClient(config, max_retries=3)
try:
user = client.call_service('user_service', 'get_user', user_id=123)
print(user)
except Exception as e:
print(f"Failed to get user after retries: {e}")Advanced connection management for high-throughput applications.
from nameko.standalone.rpc import ClusterRpcProxy
from contextlib import contextmanager
import threading
import queue
class NamekoConnectionPool:
"""Connection pool for nameko RPC clients"""
def __init__(self, config, pool_size=10):
self.config = config
self.pool_size = pool_size
self.pool = queue.Queue(maxsize=pool_size)
self.lock = threading.Lock()
# Pre-populate pool
for _ in range(pool_size):
proxy = ClusterRpcProxy(config)
proxy.start() # Initialize connection
self.pool.put(proxy)
@contextmanager
def get_proxy(self):
"""Get proxy from pool with automatic return"""
proxy = self.pool.get()
try:
yield proxy
finally:
self.pool.put(proxy)
def close_all(self):
"""Close all connections in pool"""
while not self.pool.empty():
proxy = self.pool.get()
proxy.stop()
# Usage in high-throughput application
pool = NamekoConnectionPool(config, pool_size=20)
def process_request(request_data):
with pool.get_proxy() as rpc:
# Process request using pooled connection
result = rpc.processor_service.process_data(request_data)
return result
# Cleanup on application shutdown
import atexit
atexit.register(pool.close_all)Install with Tessl CLI
npx tessl i tessl/pypi-nameko