A microservices framework for Python that lets service developers concentrate on application logic and encourages testability
—
Comprehensive testing utilities for unit testing services, mocking dependencies, integration testing with real message brokers, and end-to-end testing patterns.
Creates service worker instances for isolated unit testing without requiring full service containers or message brokers.
def worker_factory(service_cls, **dependencies):
"""
Create a service worker instance for testing.
Parameters:
- service_cls: The service class to instantiate
- **dependencies: Keyword arguments to override service dependencies
Returns:
Service worker instance with mocked or provided dependencies
"""Usage Example:
from nameko.testing.services import worker_factory
from unittest.mock import Mock
class UserService:
name = "user_service"
database = DatabaseProvider()
cache = CacheProvider()
@rpc
def create_user(self, user_data):
user_id = self.database.save_user(user_data)
self.cache.set(f'user:{user_id}', user_data)
return {'user_id': user_id}
def test_create_user():
# Mock dependencies
mock_database = Mock()
mock_database.save_user.return_value = 123
mock_cache = Mock()
# Create worker with mocked dependencies
worker = worker_factory(UserService, database=mock_database, cache=mock_cache)
# Test the service method
result = worker.create_user({'name': 'John', 'email': 'john@example.com'})
# Verify behavior
assert result['user_id'] == 123
mock_database.save_user.assert_called_once_with({'name': 'John', 'email': 'john@example.com'})
mock_cache.set.assert_called_once_with('user:123', {'name': 'John', 'email': 'john@example.com'})Provides hooks for testing specific entrypoints (RPC methods, event handlers, HTTP endpoints) in isolation.
def entrypoint_hook(container, method_name):
"""
Create a hook for testing a specific entrypoint method.
Parameters:
- container: Service container instance
- method_name: Name of the service method to hook
Returns:
Callable that can invoke the hooked method
"""Usage Example:
from nameko.testing.services import entrypoint_hook
from nameko.containers import ServiceContainer
class EmailService:
name = "email_service"
@event_handler('user_service', 'user_registered')
def send_welcome_email(self, payload):
email = payload['email']
# Send email logic
return f"Welcome email sent to {email}"
def test_welcome_email_handler():
# Create service container
container = ServiceContainer(EmailService, config={})
container.start()
try:
# Hook the event handler method
send_welcome_email = entrypoint_hook(container, 'send_welcome_email')
# Test the event handler directly
result = send_welcome_email({'email': 'user@example.com'})
assert result == "Welcome email sent to user@example.com"
finally:
container.stop()Utility for testing asynchronous operations and waiting for entrypoints to complete execution.
def entrypoint_waiter(container, method_name, timeout=None):
"""
Wait for an entrypoint method to be called and complete.
Parameters:
- container: Service container instance
- method_name: Name of the service method to wait for
- timeout: Maximum time to wait in seconds
Returns:
Context manager that waits for method completion
"""Usage Example:
from nameko.testing.services import entrypoint_waiter
import threading
class AsyncProcessingService:
name = "async_service"
@event_handler('data_service', 'data_received')
def process_data_async(self, payload):
# Simulate async processing
time.sleep(0.1)
return f"Processed {payload['data_id']}"
def test_async_processing():
container = ServiceContainer(AsyncProcessingService, config={})
container.start()
try:
# Wait for the event handler to complete
with entrypoint_waiter(container, 'process_data_async', timeout=5):
# Trigger the event handler
# (In real test, this would be triggered by actual event)
hook = entrypoint_hook(container, 'process_data_async')
# Run in separate thread to simulate async behavior
def trigger_event():
hook({'data_id': 'test-123'})
thread = threading.Thread(target=trigger_event)
thread.start()
# If we reach here, the handler completed successfully
assert True
finally:
container.stop()Utility for replacing service dependencies with test doubles, mocks, or alternative implementations.
def replace_dependencies(container, **dependencies):
"""
Replace dependencies in a service container for testing.
Parameters:
- container: Service container instance
- **dependencies: Keyword arguments mapping dependency names to replacement objects
"""Usage Example:
from nameko.testing.services import replace_dependencies
from unittest.mock import Mock
class OrderService:
name = "order_service"
payment_service = RpcProxy('payment_service')
database = DatabaseProvider()
@rpc
def create_order(self, order_data):
# Save order to database
order_id = self.database.save_order(order_data)
# Process payment
payment_result = self.payment_service.process_payment(order_data['payment'])
return {
'order_id': order_id,
'payment_status': payment_result['status']
}
def test_create_order_with_mocked_dependencies():
# Create service container
container = ServiceContainer(OrderService, config={})
# Create mocks
mock_payment_service = Mock()
mock_payment_service.process_payment.return_value = {'status': 'success'}
mock_database = Mock()
mock_database.save_order.return_value = 'order-123'
# Replace dependencies
replace_dependencies(
container,
payment_service=mock_payment_service,
database=mock_database
)
container.start()
try:
# Get service worker and test
worker = container.service
result = worker.create_order({
'items': [{'id': 1, 'quantity': 2}],
'payment': {'method': 'credit_card', 'amount': 100}
})
assert result['order_id'] == 'order-123'
assert result['payment_status'] == 'success'
# Verify mock calls
mock_database.save_order.assert_called_once()
mock_payment_service.process_payment.assert_called_once()
finally:
container.stop()Utility for limiting which entrypoints are active during testing, useful for isolating specific functionality.
def restrict_entrypoints(container, *entrypoints):
"""
Restrict active entrypoints in a service container to specified ones.
Parameters:
- container: Service container instance
- *entrypoints: Names of entrypoints to keep active (all others disabled)
"""Usage Example:
from nameko.testing.services import restrict_entrypoints
class MultiEntrypointService:
name = "multi_service"
@rpc
def rpc_method(self):
return "rpc response"
@http('GET', '/api/data')
def http_method(self, request):
return "http response"
@event_handler('other_service', 'test_event')
def event_method(self, payload):
return "event processed"
@timer(interval=60)
def timer_method(self):
return "timer executed"
def test_only_rpc_entrypoint():
"""Test with only RPC entrypoint active"""
container = ServiceContainer(MultiEntrypointService, config={})
# Restrict to only RPC entrypoint
restrict_entrypoints(container, 'rpc_method')
container.start()
try:
# RPC method should work
hook = entrypoint_hook(container, 'rpc_method')
result = hook()
assert result == "rpc response"
# Other entrypoints should be disabled
# (HTTP, event, timer entrypoints won't be active)
finally:
container.stop()
def test_multiple_entrypoints():
"""Test with multiple specific entrypoints active"""
container = ServiceContainer(MultiEntrypointService, config={})
# Keep both RPC and event entrypoints active
restrict_entrypoints(container, 'rpc_method', 'event_method')
container.start()
try:
# Both RPC and event methods should work
rpc_hook = entrypoint_hook(container, 'rpc_method')
event_hook = entrypoint_hook(container, 'event_method')
assert rpc_hook() == "rpc response"
assert event_hook({'test': 'data'}) == "event processed"
# HTTP and timer entrypoints are disabled
finally:
container.stop()Testing utilities for creating mock entrypoints and dependency providers.
class MockDependencyProvider:
"""
Mock dependency provider for testing.
Parameters:
- attr_name: Name of the dependency attribute on the service
- dependency: Mock object to inject (defaults to Mock())
"""
def __init__(self, attr_name, dependency=None): ...
def once(*args, **kwargs):
"""
Decorator that creates an entrypoint that fires only once for testing.
Useful for testing specific execution paths without ongoing triggers.
"""
@once
def test_entrypoint(self):
"""Example once-only entrypoint"""
...
def dummy(*args, **kwargs):
"""
Decorator that creates a dummy entrypoint for testing.
The entrypoint is registered but does nothing, useful for testing
service structure without external triggers.
"""
@dummy
def placeholder_entrypoint(self):
"""Example dummy entrypoint"""
...Mock Usage Example:
from nameko.testing.services import MockDependencyProvider, worker_factory
from unittest.mock import Mock
class ServiceWithDependencies:
name = "test_service"
database = DatabaseProvider()
cache = CacheProvider()
external_api = HttpClient()
@rpc
def complex_operation(self, data):
# Use multiple dependencies
db_result = self.database.query(data['query'])
cached_data = self.cache.get(data['cache_key'])
api_response = self.external_api.post('/endpoint', data)
return {
'db_result': db_result,
'cached_data': cached_data,
'api_response': api_response
}
def test_with_mock_dependencies():
# Create specific mocks for each dependency
mock_db = Mock()
mock_db.query.return_value = {'id': 1, 'name': 'test'}
mock_cache = Mock()
mock_cache.get.return_value = 'cached_value'
mock_api = Mock()
mock_api.post.return_value = {'status': 'success'}
# Create worker with all mocked dependencies
worker = worker_factory(
ServiceWithDependencies,
database=mock_db,
cache=mock_cache,
external_api=mock_api
)
# Test the service method
result = worker.complex_operation({
'query': 'SELECT * FROM users',
'cache_key': 'user:123'
})
# Verify results
assert result['db_result']['name'] == 'test'
assert result['cached_data'] == 'cached_value'
assert result['api_response']['status'] == 'success'
# Verify mock interactions
mock_db.query.assert_called_once_with('SELECT * FROM users')
mock_cache.get.assert_called_once_with('user:123')
mock_api.post.assert_called_once()Tools for integration testing with real message brokers and external services.
from nameko.testing.pytest import NamekoTestEnvironment
class NamekoTestEnvironment:
"""
Test environment for integration testing with real AMQP broker.
Provides utilities for running services with real message passing
while maintaining test isolation.
"""
def __init__(self, config=None): ...
def start_service(self, service_cls): ...
def stop_all_services(self): ...
def get_rpc_proxy(self, service_name): ...
def dispatch_event(self, source_service, event_type, event_data): ...Integration Test Example:
import pytest
from nameko.testing.pytest import NamekoTestEnvironment
@pytest.fixture
def test_env():
"""Create test environment with real AMQP broker"""
config = {
'AMQP_URI': 'amqp://guest:guest@localhost:5672/test_vhost'
}
env = NamekoTestEnvironment(config)
yield env
env.stop_all_services()
def test_service_integration(test_env):
# Start multiple services
test_env.start_service(UserService)
test_env.start_service(EmailService)
test_env.start_service(OrderService)
# Get RPC proxy for testing
user_rpc = test_env.get_rpc_proxy('user_service')
# Test inter-service communication
user_result = user_rpc.create_user({
'name': 'Test User',
'email': 'test@example.com'
})
assert user_result['user_id'] is not None
# Test event-driven behavior
test_env.dispatch_event('user_service', 'user_created', {
'user_id': user_result['user_id'],
'email': 'test@example.com'
})
# Verify that event was processed (check side effects)
# This would typically involve checking database state,
# file system, or other observable effectsTesting utilities specifically for HTTP endpoints and web interfaces.
from nameko.testing.services import worker_factory
from werkzeug.test import Client
from werkzeug.wrappers import Response
class APIService:
name = "api_service"
@http('GET', '/users/<int:user_id>')
def get_user(self, request, user_id):
return {'user_id': user_id, 'name': 'Test User'}
@http('POST', '/users')
def create_user(self, request):
data = request.get_json()
return {'user_id': 123, 'name': data['name']}, 201
def test_http_endpoints():
# Create worker for HTTP service
worker = worker_factory(APIService)
# Test GET endpoint
from werkzeug.test import EnvironBuilder
from werkzeug.wrappers import Request
# Create mock request
builder = EnvironBuilder(path='/users/456', method='GET')
request = Request(builder.get_environ())
response = worker.get_user(request, user_id=456)
assert response['user_id'] == 456
# Test POST endpoint
builder = EnvironBuilder(
path='/users',
method='POST',
data='{"name": "New User"}',
content_type='application/json'
)
request = Request(builder.get_environ())
response, status_code = worker.create_user(request)
assert status_code == 201
assert response['name'] == 'New User'Best practices for test configuration and environment management.
# conftest.py - pytest configuration
import pytest
from nameko.testing.pytest import NamekoTestEnvironment
@pytest.fixture(scope='session')
def test_config():
"""Test configuration with isolated resources"""
return {
'AMQP_URI': 'amqp://guest:guest@localhost:5672/test_vhost',
'DATABASE_URL': 'sqlite:///:memory:',
'REDIS_URL': 'redis://localhost:6379/15', # Use test database
'DEBUG': True,
'TESTING': True
}
@pytest.fixture
def test_env(test_config):
"""Isolated test environment per test"""
env = NamekoTestEnvironment(test_config)
yield env
env.stop_all_services()
# Test database setup
@pytest.fixture
def clean_database():
"""Ensure clean database state for each test"""
# Setup test database
setup_test_database()
yield
# Cleanup test database
cleanup_test_database()Utilities for load testing and performance measurement.
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def load_test_rpc_service(service_name, method_name, payload, num_requests=100, concurrency=10):
"""Load test an RPC service method"""
def make_request():
start_time = time.time()
try:
# Make RPC call
rpc_proxy = get_rpc_proxy(service_name)
result = getattr(rpc_proxy, method_name)(payload)
return time.time() - start_time, True, result
except Exception as e:
return time.time() - start_time, False, str(e)
# Execute concurrent requests
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [executor.submit(make_request) for _ in range(num_requests)]
response_times = []
success_count = 0
for future in as_completed(futures):
duration, success, result = future.result()
response_times.append(duration)
if success:
success_count += 1
# Calculate statistics
avg_response_time = sum(response_times) / len(response_times)
success_rate = success_count / num_requests
return {
'avg_response_time': avg_response_time,
'success_rate': success_rate,
'total_requests': num_requests,
'concurrency': concurrency
}
def test_service_performance():
"""Test service performance under load"""
stats = load_test_rpc_service(
'user_service',
'get_user',
{'user_id': 123},
num_requests=1000,
concurrency=50
)
# Assert performance requirements
assert stats['avg_response_time'] < 0.1 # Less than 100ms average
assert stats['success_rate'] > 0.99 # 99% success rateInstall with Tessl CLI
npx tessl i tessl/pypi-nameko