CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nameko

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability

Pending
Overview
Eval results
Files

standalone-clients.mddocs/

Standalone Clients

Client libraries for interacting with nameko services from non-nameko applications, supporting both RPC calls and event publishing with connection management and error handling.

Capabilities

Service RPC Proxy

Client for making RPC calls to specific nameko services from standalone applications.

class ServiceRpcProxy:
    """
    Standalone RPC client for calling a specific nameko service.
    
    Parameters:
    - service_name: Name of the target service
    - config: Configuration dictionary with AMQP connection details
    - context_data: Optional context data to pass with all calls
    - timeout: Request timeout in seconds
    """
    
    def __init__(self, service_name, config, context_data=None, timeout=None): ...
    
    def __enter__(self): ...
    
    def __exit__(self, exc_type, exc_val, exc_tb): ...

Usage Example:

from nameko.standalone.rpc import ServiceRpcProxy

# Configuration for AMQP connection
config = {
    'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
}

# Using context manager (recommended)
with ServiceRpcProxy('user_service', config) as user_rpc:
    # Call service methods
    user = user_rpc.get_user(user_id=123)
    print(f"User: {user['name']} ({user['email']})")
    
    # Create new user
    new_user = user_rpc.create_user({
        'name': 'John Doe',
        'email': 'john@example.com'
    })
    print(f"Created user with ID: {new_user['user_id']}")

# Manual connection management
user_rpc = ServiceRpcProxy('user_service', config)
try:
    user = user_rpc.get_user(user_id=456)
    print(user)
finally:
    user_rpc.stop()  # Always close connection

Cluster RPC Proxy

Client that can call multiple services with service discovery and connection pooling.

class ClusterRpcProxy:
    """
    Standalone RPC client for calling multiple nameko services.
    
    Parameters:
    - config: Configuration dictionary with AMQP connection details
    - context_data: Optional context data to pass with all calls
    - timeout: Request timeout in seconds
    """
    
    def __init__(self, config, context_data=None, timeout=None): ...
    
    def __enter__(self): ...
    
    def __exit__(self, exc_type, exc_val, exc_tb): ...
    
    def __getattr__(self, service_name): ...

Usage Example:

from nameko.standalone.rpc import ClusterRpcProxy

config = {
    'AMQP_URI': 'amqp://guest:guest@localhost:5672//',
    'RPC_TIMEOUT': 30  # Default timeout for all calls
}

# Using context manager for multiple services
with ClusterRpcProxy(config) as cluster_rpc:
    # Call different services through dynamic attributes
    user = cluster_rpc.user_service.get_user(123)
    order = cluster_rpc.order_service.create_order({
        'user_id': user['user_id'],
        'items': [{'product_id': 1, 'quantity': 2}]
    })
    
    # Send notification about the order
    cluster_rpc.notification_service.send_notification({
        'user_id': user['user_id'],
        'message': f'Order {order["order_id"]} created successfully'
    })

# With context data for request tracing
context_data = {
    'correlation_id': 'web-request-456',
    'user_id': 123
}

with ClusterRpcProxy(config, context_data=context_data) as cluster_rpc:
    # All calls will include the context data
    result = cluster_rpc.audit_service.log_action('user_login', {
        'timestamp': time.time()
    })

Event Dispatcher

Function for publishing events to nameko services from standalone applications.

def event_dispatcher(config):
    """
    Create an event dispatcher for publishing events.
    
    Parameters:
    - config: Configuration dictionary with AMQP connection details
    
    Returns:
    Event dispatcher function that can publish events
    """

Usage Example:

from nameko.standalone.events import event_dispatcher

config = {
    'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
}

# Create event dispatcher
dispatch_event = event_dispatcher(config)

# Publish events to services
dispatch_event('external_system', 'data_imported', {
    'file_path': '/data/import/users.csv',
    'record_count': 1000,
    'import_id': 'import-123',
    'timestamp': time.time()
})

dispatch_event('payment_gateway', 'payment_received', {
    'order_id': 'order-456',
    'amount': 99.99,
    'currency': 'USD',
    'payment_method': 'credit_card',
    'transaction_id': 'txn-789'
})

# The dispatcher automatically handles connection cleanup

Event Exchange Management

Function for getting event exchanges for manual event publishing with more control.

def get_event_exchange(config):
    """
    Get event exchange for manual event publishing.
    
    Parameters:
    - config: Configuration dictionary with AMQP connection details
    
    Returns:
    Exchange object for publishing events with full control
    """

Usage Example:

from nameko.standalone.events import get_event_exchange
import json

config = {
    'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
}

# Get exchange for manual publishing
exchange = get_event_exchange(config)

try:
    # Manual event publishing with full control
    event_data = {
        'user_id': 123,
        'action': 'profile_updated',
        'changes': ['email', 'phone'],
        'timestamp': time.time()
    }
    
    # Publish with specific routing key and headers
    exchange.publish(
        message=json.dumps(event_data),
        routing_key='user_service.profile_updated',
        headers={
            'source_service': 'web_app',
            'event_type': 'profile_updated',
            'correlation_id': 'web-session-789'
        }
    )
    
finally:
    exchange.close()

Web Application Integration

Common patterns for integrating nameko services with web frameworks.

Flask Integration Example:

from flask import Flask, request, jsonify
from nameko.standalone.rpc import ClusterRpcProxy

app = Flask(__name__)

# Nameko configuration
NAMEKO_CONFIG = {
    'AMQP_URI': 'amqp://guest:guest@localhost:5672//'
}

@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    """Get user via nameko service"""
    with ClusterRpcProxy(NAMEKO_CONFIG) as rpc:
        try:
            user = rpc.user_service.get_user(user_id)
            return jsonify(user)
        except Exception as e:
            return jsonify({'error': str(e)}), 404

@app.route('/api/users', methods=['POST'])
def create_user():
    """Create user via nameko service"""
    user_data = request.get_json()
    
    with ClusterRpcProxy(NAMEKO_CONFIG) as rpc:
        try:
            user = rpc.user_service.create_user(user_data)
            return jsonify(user), 201
        except Exception as e:
            return jsonify({'error': str(e)}), 400

@app.route('/api/orders', methods=['POST'])
def create_order():
    """Create order and send notification"""
    order_data = request.get_json()
    
    # Add correlation ID for tracing
    context_data = {
        'correlation_id': f'web-{request.headers.get("X-Request-ID", "unknown")}',
        'source': 'web_api'
    }
    
    with ClusterRpcProxy(NAMEKO_CONFIG, context_data=context_data) as rpc:
        try:
            # Create order
            order = rpc.order_service.create_order(order_data)
            
            # Send notification
            rpc.notification_service.send_order_confirmation({
                'order_id': order['order_id'],
                'user_id': order_data['user_id']
            })
            
            return jsonify(order), 201
        except Exception as e:
            return jsonify({'error': str(e)}), 400

if __name__ == '__main__':
    app.run(debug=True)

Django Integration Example:

# django_app/services.py
from django.conf import settings
from nameko.standalone.rpc import ClusterRpcProxy
from nameko.standalone.events import event_dispatcher

class NamekoService:
    """Django service layer for nameko integration"""
    
    def __init__(self):
        self.config = {
            'AMQP_URI': settings.NAMEKO_AMQP_URI
        }
        self.dispatch_event = event_dispatcher(self.config)
    
    def get_user_data(self, user_id):
        """Get enriched user data from nameko services"""
        with ClusterRpcProxy(self.config) as rpc:
            user = rpc.user_service.get_user(user_id)
            preferences = rpc.preference_service.get_user_preferences(user_id)
            
            return {
                **user,
                'preferences': preferences
            }
    
    def notify_user_action(self, user_id, action, data):
        """Notify nameko services of user actions"""
        self.dispatch_event('django_app', f'user_{action}', {
            'user_id': user_id,
            'action': action,
            'data': data,
            'timestamp': time.time()
        })

# django_app/views.py
from django.http import JsonResponse
from django.views import View
from .services import NamekoService

class UserAPIView(View):
    def __init__(self):
        super().__init__()
        self.nameko = NamekoService()
    
    def get(self, request, user_id):
        try:
            user_data = self.nameko.get_user_data(user_id)
            return JsonResponse(user_data)
        except Exception as e:
            return JsonResponse({'error': str(e)}, status=400)
    
    def post(self, request, user_id):
        # Handle user action
        action_data = json.loads(request.body)
        
        # Notify nameko services
        self.nameko.notify_user_action(
            user_id, 
            action_data['action'], 
            action_data.get('data', {})
        )
        
        return JsonResponse({'status': 'success'})

Error Handling and Retries

Robust error handling patterns for standalone clients.

from nameko.standalone.rpc import ServiceRpcProxy
from nameko.exceptions import RemoteError, ServiceNotFound
import time
import logging

class ResilientNamekoClient:
    """Wrapper for resilient nameko service calls"""
    
    def __init__(self, config, max_retries=3, retry_delay=1):
        self.config = config
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.logger = logging.getLogger(__name__)
    
    def call_service(self, service_name, method_name, *args, **kwargs):
        """Call service method with automatic retries"""
        
        for attempt in range(self.max_retries + 1):
            try:
                with ServiceRpcProxy(service_name, self.config) as rpc:
                    method = getattr(rpc, method_name)
                    return method(*args, **kwargs)
                    
            except ServiceNotFound:
                self.logger.error(f"Service {service_name} not found")
                raise  # Don't retry for service not found
                
            except RemoteError as e:
                self.logger.warning(f"Remote error on attempt {attempt + 1}: {e}")
                if attempt == self.max_retries:
                    raise
                time.sleep(self.retry_delay * (2 ** attempt))  # Exponential backoff
                
            except Exception as e:
                self.logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")
                if attempt == self.max_retries:
                    raise
                time.sleep(self.retry_delay)

# Usage
client = ResilientNamekoClient(config, max_retries=3)

try:
    user = client.call_service('user_service', 'get_user', user_id=123)
    print(user)
except Exception as e:
    print(f"Failed to get user after retries: {e}")

Connection Pooling

Advanced connection management for high-throughput applications.

from nameko.standalone.rpc import ClusterRpcProxy
from contextlib import contextmanager
import threading
import queue

class NamekoConnectionPool:
    """Connection pool for nameko RPC clients"""
    
    def __init__(self, config, pool_size=10):
        self.config = config
        self.pool_size = pool_size
        self.pool = queue.Queue(maxsize=pool_size)
        self.lock = threading.Lock()
        
        # Pre-populate pool
        for _ in range(pool_size):
            proxy = ClusterRpcProxy(config)
            proxy.start()  # Initialize connection
            self.pool.put(proxy)
    
    @contextmanager
    def get_proxy(self):
        """Get proxy from pool with automatic return"""
        proxy = self.pool.get()
        try:
            yield proxy
        finally:
            self.pool.put(proxy)
    
    def close_all(self):
        """Close all connections in pool"""
        while not self.pool.empty():
            proxy = self.pool.get()
            proxy.stop()

# Usage in high-throughput application
pool = NamekoConnectionPool(config, pool_size=20)

def process_request(request_data):
    with pool.get_proxy() as rpc:
        # Process request using pooled connection
        result = rpc.processor_service.process_data(request_data)
        return result

# Cleanup on application shutdown
import atexit
atexit.register(pool.close_all)

Install with Tessl CLI

npx tessl i tessl/pypi-nameko

docs

cli-interface.md

dependency-injection.md

event-system.md

http-interface.md

index.md

rpc-communication.md

service-management.md

standalone-clients.md

testing-framework.md

timer-scheduling.md

tile.json