A microservices framework for Python that lets service developers concentrate on application logic and encourages testability
—
Remote procedure call system that enables synchronous inter-service communication with automatic serialization, load balancing, error handling, and distributed tracing support.
Decorator that exposes service methods as RPC endpoints, making them callable from other services or standalone clients.
def rpc(fn=None, expected_exceptions=()):
"""
Decorator to expose a service method as an RPC endpoint.
Parameters:
- fn: The function to decorate (used internally)
- expected_exceptions: Tuple of exception types that should be propagated to callers
Returns:
Decorated method that can be called remotely
"""Usage Example:
from nameko.rpc import rpc
from nameko.exceptions import BadRequest
class CalculatorService:
name = "calculator"
@rpc
def add(self, a, b):
return a + b
@rpc(expected_exceptions=(ValueError, BadRequest))
def divide(self, a, b):
if b == 0:
raise ValueError("Division by zero")
return a / bDependency provider that enables services to make RPC calls to other services with automatic service discovery and load balancing.
class RpcProxy:
"""
Dependency provider for making RPC calls to other services.
Parameters:
- target_service: Name of the target service to call
- **options: Additional options for the RPC proxy
"""
def __init__(self, target_service, **options): ...Usage Example:
from nameko.rpc import rpc, RpcProxy
class UserService:
name = "user_service"
# RPC proxy to calculator service
calculator_rpc = RpcProxy('calculator')
@rpc
def calculate_user_score(self, user_id, base_score, multiplier):
# Call remote service method
score = self.calculator_rpc.multiply(base_score, multiplier)
return {'user_id': user_id, 'score': score}RPC proxies support asynchronous calls that return immediately with a reply object, allowing for non-blocking parallel service calls.
class MethodProxy:
"""
Proxy for individual RPC methods with synchronous and asynchronous call support.
"""
def __call__(self, *args, **kwargs):
"""Make synchronous RPC call"""
...
def call_async(self, *args, **kwargs):
"""
Make asynchronous RPC call.
Returns:
RpcReply object that can be used to retrieve the result later
"""
...
class RpcReply:
"""
Represents a pending or completed asynchronous RPC call.
"""
def result(self):
"""
Get the result of the asynchronous RPC call.
Blocks until the result is available.
Returns:
The result of the RPC call
Raises:
Any exception that occurred during the remote call
"""
...Usage Example:
from nameko.rpc import rpc, RpcProxy
class OrderService:
name = "order_service"
payment_rpc = RpcProxy('payment_service')
inventory_rpc = RpcProxy('inventory_service')
email_rpc = RpcProxy('email_service')
@rpc
def process_order_parallel(self, order_data):
"""Process order with parallel service calls for better performance"""
# Start multiple async calls in parallel
payment_reply = self.payment_rpc.process_payment.call_async(
order_data['payment_info']
)
inventory_reply = self.inventory_rpc.reserve_items.call_async(
order_data['items']
)
email_reply = self.email_rpc.send_confirmation.call_async(
order_data['customer_email'],
order_data['order_id']
)
# Collect results (this blocks until all complete)
try:
payment_result = payment_reply.result()
inventory_result = inventory_reply.result()
email_result = email_reply.result()
return {
'status': 'success',
'payment': payment_result,
'inventory': inventory_result,
'email_sent': email_result
}
except Exception as e:
# Handle any failures in the parallel calls
return {'status': 'failed', 'error': str(e)}
@rpc
def get_order_summary_async(self, order_id):
"""Fetch order data from multiple services asynchronously"""
# Launch parallel requests
order_reply = self.order_db.get_order.call_async(order_id)
customer_reply = self.customer_rpc.get_customer.call_async(order_id)
shipping_reply = self.shipping_rpc.get_tracking.call_async(order_id)
# Wait for all results
order = order_reply.result()
customer = customer_reply.result()
shipping = shipping_reply.result()
return {
'order': order,
'customer': customer,
'shipping': shipping
}RPC calls automatically handle serialization and propagation of exceptions between services.
Built-in Exception Types:
class RemoteError(NamekoException):
"""
Wraps exceptions that occurred in remote service calls.
Attributes:
- exc_type: Original exception type name
- exc_args: Original exception arguments
- exc_path: Service path where exception occurred
"""
class ServiceNotFound(NamekoException):
"""Raised when the target service cannot be found"""
class MethodNotFound(NamekoException):
"""Raised when the target method is not found on the service"""Exception Handling Example:
from nameko.rpc import rpc, RpcProxy
from nameko.exceptions import RemoteError, ServiceNotFound
class OrderService:
name = "order_service"
payment_rpc = RpcProxy('payment_service')
@rpc
def process_order(self, order_data):
try:
# This might raise a remote exception
payment_result = self.payment_rpc.process_payment(
order_data['payment_info']
)
return {'status': 'success', 'payment': payment_result}
except RemoteError as e:
# Handle remote service errors
return {'status': 'payment_failed', 'error': str(e)}
except ServiceNotFound:
# Handle service discovery failures
return {'status': 'service_unavailable'}RPC calls automatically propagate context data like correlation IDs, user information, and tracing data across service boundaries.
Context Data Access:
from nameko.contextdata import ContextDataProvider
class AuditService:
name = "audit_service"
context_data = ContextDataProvider()
@rpc
def log_action(self, action, data):
# Access context passed from calling service
user_id = self.context_data.get('user_id')
correlation_id = self.context_data.get('correlation_id')
return {
'action': action,
'user_id': user_id,
'correlation_id': correlation_id,
'timestamp': time.time()
}Configuration Example:
# config.yaml
AMQP_URI: 'amqp://guest:guest@localhost:5672//'
RPC_TIMEOUT: 30 # seconds
RPC_RETRY_POLICY:
max_retries: 3
interval_start: 0.1
interval_step: 0.2Install with Tessl CLI
npx tessl i tessl/pypi-nameko