CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nameko

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability

Pending
Overview
Eval results
Files

testing-framework.mddocs/

Testing Framework

Comprehensive testing utilities for unit testing services, mocking dependencies, integration testing with real message brokers, and end-to-end testing patterns.

Capabilities

Worker Factory

Creates service worker instances for isolated unit testing without requiring full service containers or message brokers.

def worker_factory(service_cls, **dependencies):
    """
    Create a service worker instance for testing.
    
    Parameters:
    - service_cls: The service class to instantiate
    - **dependencies: Keyword arguments to override service dependencies
    
    Returns:
    Service worker instance with mocked or provided dependencies
    """

Usage Example:

from nameko.testing.services import worker_factory
from unittest.mock import Mock

class UserService:
    name = "user_service"
    
    database = DatabaseProvider()
    cache = CacheProvider()
    
    @rpc
    def create_user(self, user_data):
        user_id = self.database.save_user(user_data)
        self.cache.set(f'user:{user_id}', user_data)
        return {'user_id': user_id}

def test_create_user():
    # Mock dependencies
    mock_database = Mock()
    mock_database.save_user.return_value = 123
    mock_cache = Mock()
    
    # Create worker with mocked dependencies
    worker = worker_factory(UserService, database=mock_database, cache=mock_cache)
    
    # Test the service method
    result = worker.create_user({'name': 'John', 'email': 'john@example.com'})
    
    # Verify behavior
    assert result['user_id'] == 123
    mock_database.save_user.assert_called_once_with({'name': 'John', 'email': 'john@example.com'})
    mock_cache.set.assert_called_once_with('user:123', {'name': 'John', 'email': 'john@example.com'})

Entrypoint Hook

Provides hooks for testing specific entrypoints (RPC methods, event handlers, HTTP endpoints) in isolation.

def entrypoint_hook(container, method_name):
    """
    Create a hook for testing a specific entrypoint method.
    
    Parameters:
    - container: Service container instance
    - method_name: Name of the service method to hook
    
    Returns:
    Callable that can invoke the hooked method
    """

Usage Example:

from nameko.testing.services import entrypoint_hook
from nameko.containers import ServiceContainer

class EmailService:
    name = "email_service"
    
    @event_handler('user_service', 'user_registered')
    def send_welcome_email(self, payload):
        email = payload['email']
        # Send email logic
        return f"Welcome email sent to {email}"

def test_welcome_email_handler():
    # Create service container
    container = ServiceContainer(EmailService, config={})
    container.start()
    
    try:
        # Hook the event handler method
        send_welcome_email = entrypoint_hook(container, 'send_welcome_email')
        
        # Test the event handler directly
        result = send_welcome_email({'email': 'user@example.com'})
        
        assert result == "Welcome email sent to user@example.com"
    finally:
        container.stop()

Entrypoint Waiter

Utility for testing asynchronous operations and waiting for entrypoints to complete execution.

def entrypoint_waiter(container, method_name, timeout=None):
    """
    Wait for an entrypoint method to be called and complete.
    
    Parameters:
    - container: Service container instance
    - method_name: Name of the service method to wait for
    - timeout: Maximum time to wait in seconds
    
    Returns:
    Context manager that waits for method completion
    """

Usage Example:

from nameko.testing.services import entrypoint_waiter
import threading

class AsyncProcessingService:
    name = "async_service"
    
    @event_handler('data_service', 'data_received')
    def process_data_async(self, payload):
        # Simulate async processing
        time.sleep(0.1)
        return f"Processed {payload['data_id']}"

def test_async_processing():
    container = ServiceContainer(AsyncProcessingService, config={})
    container.start()
    
    try:
        # Wait for the event handler to complete
        with entrypoint_waiter(container, 'process_data_async', timeout=5):
            # Trigger the event handler
            # (In real test, this would be triggered by actual event)
            hook = entrypoint_hook(container, 'process_data_async')
            
            # Run in separate thread to simulate async behavior
            def trigger_event():
                hook({'data_id': 'test-123'})
            
            thread = threading.Thread(target=trigger_event)
            thread.start()
            
        # If we reach here, the handler completed successfully
        assert True
    finally:
        container.stop()

Dependency Replacement

Utility for replacing service dependencies with test doubles, mocks, or alternative implementations.

def replace_dependencies(container, **dependencies):
    """
    Replace dependencies in a service container for testing.
    
    Parameters:
    - container: Service container instance
    - **dependencies: Keyword arguments mapping dependency names to replacement objects
    """

Usage Example:

from nameko.testing.services import replace_dependencies
from unittest.mock import Mock

class OrderService:
    name = "order_service"
    
    payment_service = RpcProxy('payment_service')
    database = DatabaseProvider()
    
    @rpc
    def create_order(self, order_data):
        # Save order to database
        order_id = self.database.save_order(order_data)
        
        # Process payment
        payment_result = self.payment_service.process_payment(order_data['payment'])
        
        return {
            'order_id': order_id,
            'payment_status': payment_result['status']
        }

def test_create_order_with_mocked_dependencies():
    # Create service container
    container = ServiceContainer(OrderService, config={})
    
    # Create mocks
    mock_payment_service = Mock()
    mock_payment_service.process_payment.return_value = {'status': 'success'}
    
    mock_database = Mock()
    mock_database.save_order.return_value = 'order-123'
    
    # Replace dependencies
    replace_dependencies(
        container,
        payment_service=mock_payment_service,
        database=mock_database
    )
    
    container.start()
    
    try:
        # Get service worker and test
        worker = container.service
        result = worker.create_order({
            'items': [{'id': 1, 'quantity': 2}],
            'payment': {'method': 'credit_card', 'amount': 100}
        })
        
        assert result['order_id'] == 'order-123'
        assert result['payment_status'] == 'success'
        
        # Verify mock calls
        mock_database.save_order.assert_called_once()
        mock_payment_service.process_payment.assert_called_once()
    finally:
        container.stop()

Entrypoint Restriction

Utility for limiting which entrypoints are active during testing, useful for isolating specific functionality.

def restrict_entrypoints(container, *entrypoints):
    """
    Restrict active entrypoints in a service container to specified ones.
    
    Parameters:
    - container: Service container instance
    - *entrypoints: Names of entrypoints to keep active (all others disabled)
    """

Usage Example:

from nameko.testing.services import restrict_entrypoints

class MultiEntrypointService:
    name = "multi_service"
    
    @rpc
    def rpc_method(self):
        return "rpc response"
    
    @http('GET', '/api/data')
    def http_method(self, request):
        return "http response"
    
    @event_handler('other_service', 'test_event')
    def event_method(self, payload):
        return "event processed"
    
    @timer(interval=60)
    def timer_method(self):
        return "timer executed"

def test_only_rpc_entrypoint():
    """Test with only RPC entrypoint active"""
    container = ServiceContainer(MultiEntrypointService, config={})
    
    # Restrict to only RPC entrypoint
    restrict_entrypoints(container, 'rpc_method')
    
    container.start()
    
    try:
        # RPC method should work
        hook = entrypoint_hook(container, 'rpc_method')
        result = hook()
        assert result == "rpc response"
        
        # Other entrypoints should be disabled
        # (HTTP, event, timer entrypoints won't be active)
        
    finally:
        container.stop()

def test_multiple_entrypoints():
    """Test with multiple specific entrypoints active"""
    container = ServiceContainer(MultiEntrypointService, config={})
    
    # Keep both RPC and event entrypoints active
    restrict_entrypoints(container, 'rpc_method', 'event_method')
    
    container.start()
    
    try:
        # Both RPC and event methods should work
        rpc_hook = entrypoint_hook(container, 'rpc_method')
        event_hook = entrypoint_hook(container, 'event_method')
        
        assert rpc_hook() == "rpc response"
        assert event_hook({'test': 'data'}) == "event processed"
        
        # HTTP and timer entrypoints are disabled
        
    finally:
        container.stop()

Mock Entrypoints and Extensions

Testing utilities for creating mock entrypoints and dependency providers.

class MockDependencyProvider:
    """
    Mock dependency provider for testing.
    
    Parameters:
    - attr_name: Name of the dependency attribute on the service
    - dependency: Mock object to inject (defaults to Mock())
    """
    
    def __init__(self, attr_name, dependency=None): ...

def once(*args, **kwargs):
    """
    Decorator that creates an entrypoint that fires only once for testing.
    Useful for testing specific execution paths without ongoing triggers.
    """

@once
def test_entrypoint(self):
    """Example once-only entrypoint"""
    ...

def dummy(*args, **kwargs):
    """
    Decorator that creates a dummy entrypoint for testing.
    The entrypoint is registered but does nothing, useful for testing
    service structure without external triggers.
    """

@dummy
def placeholder_entrypoint(self):
    """Example dummy entrypoint"""
    ...

Mock Usage Example:

from nameko.testing.services import MockDependencyProvider, worker_factory
from unittest.mock import Mock

class ServiceWithDependencies:
    name = "test_service"
    
    database = DatabaseProvider()
    cache = CacheProvider()
    external_api = HttpClient()
    
    @rpc
    def complex_operation(self, data):
        # Use multiple dependencies
        db_result = self.database.query(data['query'])
        cached_data = self.cache.get(data['cache_key'])
        api_response = self.external_api.post('/endpoint', data)
        
        return {
            'db_result': db_result,
            'cached_data': cached_data,
            'api_response': api_response
        }

def test_with_mock_dependencies():
    # Create specific mocks for each dependency
    mock_db = Mock()
    mock_db.query.return_value = {'id': 1, 'name': 'test'}
    
    mock_cache = Mock()
    mock_cache.get.return_value = 'cached_value'
    
    mock_api = Mock()
    mock_api.post.return_value = {'status': 'success'}
    
    # Create worker with all mocked dependencies
    worker = worker_factory(
        ServiceWithDependencies,
        database=mock_db,
        cache=mock_cache,
        external_api=mock_api
    )
    
    # Test the service method
    result = worker.complex_operation({
        'query': 'SELECT * FROM users',
        'cache_key': 'user:123'
    })
    
    # Verify results
    assert result['db_result']['name'] == 'test'
    assert result['cached_data'] == 'cached_value'
    assert result['api_response']['status'] == 'success'
    
    # Verify mock interactions
    mock_db.query.assert_called_once_with('SELECT * FROM users')
    mock_cache.get.assert_called_once_with('user:123')
    mock_api.post.assert_called_once()

Integration Testing

Tools for integration testing with real message brokers and external services.

from nameko.testing.pytest import NamekoTestEnvironment

class NamekoTestEnvironment:
    """
    Test environment for integration testing with real AMQP broker.
    
    Provides utilities for running services with real message passing
    while maintaining test isolation.
    """
    
    def __init__(self, config=None): ...
    
    def start_service(self, service_cls): ...
    
    def stop_all_services(self): ...
    
    def get_rpc_proxy(self, service_name): ...
    
    def dispatch_event(self, source_service, event_type, event_data): ...

Integration Test Example:

import pytest
from nameko.testing.pytest import NamekoTestEnvironment

@pytest.fixture
def test_env():
    """Create test environment with real AMQP broker"""
    config = {
        'AMQP_URI': 'amqp://guest:guest@localhost:5672/test_vhost'
    }
    env = NamekoTestEnvironment(config)
    yield env
    env.stop_all_services()

def test_service_integration(test_env):
    # Start multiple services
    test_env.start_service(UserService)
    test_env.start_service(EmailService)
    test_env.start_service(OrderService)
    
    # Get RPC proxy for testing
    user_rpc = test_env.get_rpc_proxy('user_service')
    
    # Test inter-service communication
    user_result = user_rpc.create_user({
        'name': 'Test User',
        'email': 'test@example.com'
    })
    
    assert user_result['user_id'] is not None
    
    # Test event-driven behavior
    test_env.dispatch_event('user_service', 'user_created', {
        'user_id': user_result['user_id'],
        'email': 'test@example.com'
    })
    
    # Verify that event was processed (check side effects)
    # This would typically involve checking database state,
    # file system, or other observable effects

HTTP Endpoint Testing

Testing utilities specifically for HTTP endpoints and web interfaces.

from nameko.testing.services import worker_factory
from werkzeug.test import Client
from werkzeug.wrappers import Response

class APIService:
    name = "api_service"
    
    @http('GET', '/users/<int:user_id>')
    def get_user(self, request, user_id):
        return {'user_id': user_id, 'name': 'Test User'}
    
    @http('POST', '/users')
    def create_user(self, request):
        data = request.get_json()
        return {'user_id': 123, 'name': data['name']}, 201

def test_http_endpoints():
    # Create worker for HTTP service
    worker = worker_factory(APIService)
    
    # Test GET endpoint
    from werkzeug.test import EnvironBuilder
    from werkzeug.wrappers import Request
    
    # Create mock request
    builder = EnvironBuilder(path='/users/456', method='GET')
    request = Request(builder.get_environ())
    
    response = worker.get_user(request, user_id=456)
    assert response['user_id'] == 456
    
    # Test POST endpoint  
    builder = EnvironBuilder(
        path='/users',
        method='POST',
        data='{"name": "New User"}',
        content_type='application/json'
    )
    request = Request(builder.get_environ())
    
    response, status_code = worker.create_user(request)
    assert status_code == 201
    assert response['name'] == 'New User'

Test Configuration

Best practices for test configuration and environment management.

# conftest.py - pytest configuration
import pytest
from nameko.testing.pytest import NamekoTestEnvironment

@pytest.fixture(scope='session')
def test_config():
    """Test configuration with isolated resources"""
    return {
        'AMQP_URI': 'amqp://guest:guest@localhost:5672/test_vhost',
        'DATABASE_URL': 'sqlite:///:memory:',
        'REDIS_URL': 'redis://localhost:6379/15',  # Use test database
        'DEBUG': True,
        'TESTING': True
    }

@pytest.fixture
def test_env(test_config):
    """Isolated test environment per test"""
    env = NamekoTestEnvironment(test_config)
    yield env
    env.stop_all_services()

# Test database setup
@pytest.fixture
def clean_database():
    """Ensure clean database state for each test"""
    # Setup test database
    setup_test_database() 
    yield
    # Cleanup test database
    cleanup_test_database()

Performance Testing

Utilities for load testing and performance measurement.

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def load_test_rpc_service(service_name, method_name, payload, num_requests=100, concurrency=10):
    """Load test an RPC service method"""
    
    def make_request():
        start_time = time.time()
        try:
            # Make RPC call
            rpc_proxy = get_rpc_proxy(service_name)
            result = getattr(rpc_proxy, method_name)(payload)
            return time.time() - start_time, True, result
        except Exception as e:
            return time.time() - start_time, False, str(e)
    
    # Execute concurrent requests
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [executor.submit(make_request) for _ in range(num_requests)]
        
        response_times = []
        success_count = 0
        
        for future in as_completed(futures):
            duration, success, result = future.result()
            response_times.append(duration)
            if success:
                success_count += 1
    
    # Calculate statistics
    avg_response_time = sum(response_times) / len(response_times)
    success_rate = success_count / num_requests
    
    return {
        'avg_response_time': avg_response_time,
        'success_rate': success_rate,
        'total_requests': num_requests,
        'concurrency': concurrency
    }

def test_service_performance():
    """Test service performance under load"""
    stats = load_test_rpc_service(
        'user_service', 
        'get_user', 
        {'user_id': 123},
        num_requests=1000,
        concurrency=50
    )
    
    # Assert performance requirements
    assert stats['avg_response_time'] < 0.1  # Less than 100ms average
    assert stats['success_rate'] > 0.99      # 99% success rate

Install with Tessl CLI

npx tessl i tessl/pypi-nameko

docs

cli-interface.md

dependency-injection.md

event-system.md

http-interface.md

index.md

rpc-communication.md

service-management.md

standalone-clients.md

testing-framework.md

timer-scheduling.md

tile.json