CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nameko

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability

Pending
Overview
Eval results
Files

rpc-communication.mddocs/

RPC Communication

Remote procedure call system that enables synchronous inter-service communication with automatic serialization, load balancing, error handling, and distributed tracing support.

Capabilities

RPC Entrypoint Decorator

Decorator that exposes service methods as RPC endpoints, making them callable from other services or standalone clients.

def rpc(fn=None, expected_exceptions=()):
    """
    Decorator to expose a service method as an RPC endpoint.
    
    Parameters:
    - fn: The function to decorate (used internally)
    - expected_exceptions: Tuple of exception types that should be propagated to callers
    
    Returns:
    Decorated method that can be called remotely
    """

Usage Example:

from nameko.rpc import rpc
from nameko.exceptions import BadRequest

class CalculatorService:
    name = "calculator"
    
    @rpc
    def add(self, a, b):
        return a + b
    
    @rpc(expected_exceptions=(ValueError, BadRequest))
    def divide(self, a, b):
        if b == 0:
            raise ValueError("Division by zero")
        return a / b

RPC Proxy

Dependency provider that enables services to make RPC calls to other services with automatic service discovery and load balancing.

class RpcProxy:
    """
    Dependency provider for making RPC calls to other services.
    
    Parameters:
    - target_service: Name of the target service to call
    - **options: Additional options for the RPC proxy
    """
    
    def __init__(self, target_service, **options): ...

Usage Example:

from nameko.rpc import rpc, RpcProxy

class UserService:
    name = "user_service"
    
    # RPC proxy to calculator service
    calculator_rpc = RpcProxy('calculator')
    
    @rpc
    def calculate_user_score(self, user_id, base_score, multiplier):
        # Call remote service method
        score = self.calculator_rpc.multiply(base_score, multiplier)
        return {'user_id': user_id, 'score': score}

Asynchronous RPC Calls

RPC proxies support asynchronous calls that return immediately with a reply object, allowing for non-blocking parallel service calls.

class MethodProxy:
    """
    Proxy for individual RPC methods with synchronous and asynchronous call support.
    """
    
    def __call__(self, *args, **kwargs):
        """Make synchronous RPC call"""
        ...
    
    def call_async(self, *args, **kwargs):
        """
        Make asynchronous RPC call.
        
        Returns:
        RpcReply object that can be used to retrieve the result later
        """
        ...

class RpcReply:
    """
    Represents a pending or completed asynchronous RPC call.
    """
    
    def result(self):
        """
        Get the result of the asynchronous RPC call.
        Blocks until the result is available.
        
        Returns:
        The result of the RPC call
        
        Raises:
        Any exception that occurred during the remote call
        """
        ...

Usage Example:

from nameko.rpc import rpc, RpcProxy

class OrderService:
    name = "order_service"
    
    payment_rpc = RpcProxy('payment_service')
    inventory_rpc = RpcProxy('inventory_service')
    email_rpc = RpcProxy('email_service')
    
    @rpc
    def process_order_parallel(self, order_data):
        """Process order with parallel service calls for better performance"""
        
        # Start multiple async calls in parallel
        payment_reply = self.payment_rpc.process_payment.call_async(
            order_data['payment_info']
        )
        inventory_reply = self.inventory_rpc.reserve_items.call_async(
            order_data['items']
        )
        email_reply = self.email_rpc.send_confirmation.call_async(
            order_data['customer_email'],
            order_data['order_id']
        )
        
        # Collect results (this blocks until all complete)
        try:
            payment_result = payment_reply.result()
            inventory_result = inventory_reply.result()
            email_result = email_reply.result()
            
            return {
                'status': 'success',
                'payment': payment_result,
                'inventory': inventory_result,
                'email_sent': email_result
            }
        except Exception as e:
            # Handle any failures in the parallel calls
            return {'status': 'failed', 'error': str(e)}
    
    @rpc
    def get_order_summary_async(self, order_id):
        """Fetch order data from multiple services asynchronously"""
        
        # Launch parallel requests
        order_reply = self.order_db.get_order.call_async(order_id)
        customer_reply = self.customer_rpc.get_customer.call_async(order_id)
        shipping_reply = self.shipping_rpc.get_tracking.call_async(order_id)
        
        # Wait for all results
        order = order_reply.result()
        customer = customer_reply.result()
        shipping = shipping_reply.result()
        
        return {
            'order': order,
            'customer': customer,
            'shipping': shipping
        }

Error Handling

RPC calls automatically handle serialization and propagation of exceptions between services.

Built-in Exception Types:

class RemoteError(NamekoException):
    """
    Wraps exceptions that occurred in remote service calls.
    
    Attributes:
    - exc_type: Original exception type name
    - exc_args: Original exception arguments  
    - exc_path: Service path where exception occurred
    """

class ServiceNotFound(NamekoException):
    """Raised when the target service cannot be found"""

class MethodNotFound(NamekoException):
    """Raised when the target method is not found on the service"""

Exception Handling Example:

from nameko.rpc import rpc, RpcProxy
from nameko.exceptions import RemoteError, ServiceNotFound

class OrderService:
    name = "order_service"
    
    payment_rpc = RpcProxy('payment_service')
    
    @rpc
    def process_order(self, order_data):
        try:
            # This might raise a remote exception
            payment_result = self.payment_rpc.process_payment(
                order_data['payment_info']
            )
            return {'status': 'success', 'payment': payment_result}
        except RemoteError as e:
            # Handle remote service errors
            return {'status': 'payment_failed', 'error': str(e)}
        except ServiceNotFound:
            # Handle service discovery failures
            return {'status': 'service_unavailable'}

Context Propagation

RPC calls automatically propagate context data like correlation IDs, user information, and tracing data across service boundaries.

Context Data Access:

from nameko.contextdata import ContextDataProvider

class AuditService:
    name = "audit_service"
    
    context_data = ContextDataProvider()
    
    @rpc
    def log_action(self, action, data):
        # Access context passed from calling service
        user_id = self.context_data.get('user_id')
        correlation_id = self.context_data.get('correlation_id')
        
        return {
            'action': action,
            'user_id': user_id,
            'correlation_id': correlation_id,
            'timestamp': time.time()
        }

Performance Considerations

  • Connection Pooling: RPC proxies automatically manage connection pools
  • Load Balancing: Requests are distributed across available service instances
  • Timeouts: Configure request timeouts to prevent hanging calls
  • Retries: Built-in retry mechanisms for transient failures

Configuration Example:

# config.yaml
AMQP_URI: 'amqp://guest:guest@localhost:5672//'
RPC_TIMEOUT: 30  # seconds
RPC_RETRY_POLICY:
  max_retries: 3
  interval_start: 0.1
  interval_step: 0.2

Install with Tessl CLI

npx tessl i tessl/pypi-nameko

docs

cli-interface.md

dependency-injection.md

event-system.md

http-interface.md

index.md

rpc-communication.md

service-management.md

standalone-clients.md

testing-framework.md

timer-scheduling.md

tile.json