A microservices framework for Python that lets service developers concentrate on application logic and encourages testability
—
System for providing services with access to external resources like databases, configuration, caches, and other services through clean dependency injection patterns and lifecycle management.
Provides access to service configuration with support for nested values, environment variables, and default fallbacks.
class Config:
"""
Dependency provider for accessing configuration values.
Parameters:
- key: Optional specific configuration key to bind to
"""
def __init__(self, key=None): ...Usage Example:
from nameko.dependency_providers import Config
class DatabaseService:
name = "database_service"
# Inject entire config
config = Config()
# Inject specific config key
db_config = Config('DATABASE')
api_key = Config('API_KEY')
@rpc
def connect_to_database(self):
# Access nested configuration
host = self.db_config['host']
port = self.db_config['port']
username = self.db_config['username']
password = self.db_config['password']
connection_string = f"postgresql://{username}:{password}@{host}:{port}"
return self._connect(connection_string)
@rpc
def get_api_settings(self):
# Access configuration with defaults
timeout = self.config.get('API_TIMEOUT', 30)
retries = self.config.get('API_RETRIES', 3)
return {
'api_key': self.api_key,
'timeout': timeout,
'retries': retries
}Built-in dependency providers for accessing request context data like user information, authentication tokens, and request metadata.
class Language:
"""
Dependency provider for accessing request language context.
"""
def __init__(self): ...
class UserId:
"""
Dependency provider for accessing user ID from request context.
"""
def __init__(self): ...
class UserAgent:
"""
Dependency provider for accessing user agent from request context.
"""
def __init__(self): ...
class AuthToken:
"""
Dependency provider for accessing authentication token from request context.
"""
def __init__(self): ...Usage Example:
from nameko.contextdata import Language, UserId, UserAgent, AuthToken
from nameko.rpc import rpc
class UserService:
name = "user_service"
# Context data providers
language = Language()
user_id = UserId()
user_agent = UserAgent()
auth_token = AuthToken()
@rpc
def get_user_preferences(self):
"""Get user preferences with context awareness"""
current_user_id = self.user_id
preferred_language = self.language
client_info = self.user_agent
token = self.auth_token
return {
'user_id': current_user_id,
'language': preferred_language,
'client': client_info,
'authenticated': bool(token)
}
@rpc
def log_user_action(self, action):
"""Log user action with full context"""
context = {
'user_id': self.user_id,
'language': self.language,
'user_agent': self.user_agent,
'action': action,
'timestamp': time.time()
}
# Log with context information
self._log_action(context)
return {'status': 'logged'}Create custom dependency providers for databases, caches, external APIs, and other resources.
from nameko.extensions import DependencyProvider
class DatabaseProvider(DependencyProvider):
"""
Custom dependency provider for database connections.
"""
def setup(self):
"""Called when the service container is started"""
self.connection_pool = self._create_connection_pool()
def stop(self):
"""Called when the service container is stopped"""
self.connection_pool.close()
def get_dependency(self, worker_ctx):
"""Called for each service method invocation"""
return self.connection_pool.get_connection()Usage Example:
import redis
from nameko.extensions import DependencyProvider
class RedisProvider(DependencyProvider):
"""Redis connection provider with connection pooling"""
def setup(self):
config = self.container.config
redis_url = config.get('REDIS_URL', 'redis://localhost:6379/0')
self.connection_pool = redis.ConnectionPool.from_url(redis_url)
def stop(self):
self.connection_pool.disconnect()
def get_dependency(self, worker_ctx):
return redis.Redis(connection_pool=self.connection_pool)
class CacheService:
name = "cache_service"
# Custom dependency injection
redis = RedisProvider()
config = Config()
@rpc
def get_cached_value(self, key):
"""Get value from Redis cache"""
value = self.redis.get(key)
return value.decode('utf-8') if value else None
@rpc
def set_cached_value(self, key, value, ttl=None):
"""Set value in Redis cache with optional TTL"""
ttl = ttl or self.config.get('DEFAULT_CACHE_TTL', 3600)
self.redis.setex(key, ttl, value)
return True
@rpc
def clear_cache_pattern(self, pattern):
"""Clear cache entries matching pattern"""
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
return len(keys)Common patterns for database integration with connection management and transaction handling.
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker
from nameko.extensions import DependencyProvider
class DatabaseSession(DependencyProvider):
"""SQLAlchemy database session provider"""
def setup(self):
config = self.container.config
db_url = config['DATABASE_URL']
self.engine = sa.create_engine(db_url, pool_pre_ping=True)
self.Session = sessionmaker(bind=self.engine)
def stop(self):
self.engine.dispose()
def get_dependency(self, worker_ctx):
return self.Session()
class UserService:
name = "user_service"
db = DatabaseSession()
@rpc
def create_user(self, user_data):
"""Create user with automatic session management"""
session = self.db
try:
user = User(**user_data)
session.add(user)
session.commit()
return {'id': user.id, 'email': user.email}
except Exception:
session.rollback()
raise
finally:
session.close()
@rpc
def get_users_by_status(self, status):
"""Query users with session cleanup"""
session = self.db
try:
users = session.query(User).filter(User.status == status).all()
return [{'id': u.id, 'email': u.email} for u in users]
finally:
session.close()Dependencies that are shared across multiple services for resource efficiency.
from nameko.extensions import SharedExtension
class SharedCache(SharedExtension):
"""
Shared cache instance across multiple services.
Only one instance is created per service container.
"""
def setup(self):
self.cache = {}
self.max_size = 1000
def get(self, key):
return self.cache.get(key)
def set(self, key, value):
if len(self.cache) >= self.max_size:
# Simple LRU eviction
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = valueUsage Example:
class UserService:
name = "user_service"
cache = SharedCache()
@rpc
def get_user(self, user_id):
# Check cache first
cached_user = self.cache.get(f'user:{user_id}')
if cached_user:
return cached_user
# Fetch from database
user = self._fetch_user_from_db(user_id)
# Cache the result
self.cache.set(f'user:{user_id}', user)
return user
class OrderService:
name = "order_service"
# Same shared cache instance
cache = SharedCache()
@rpc
def invalidate_user_cache(self, user_id):
# Invalidate cached user data
cache_key = f'user:{user_id}'
if self.cache.get(cache_key):
del self.cache.cache[cache_key]Dependency providers support full lifecycle management with setup, teardown, and worker context handling.
class ManagedResource(DependencyProvider):
"""Example of full lifecycle management"""
def setup(self):
"""Called once when service container starts"""
print("Setting up managed resource...")
self.resource = self._initialize_resource()
self.health_checker = self._start_health_checker()
def stop(self):
"""Called once when service container stops"""
print("Stopping managed resource...")
self.health_checker.stop()
self.resource.close()
def worker_setup(self, worker_ctx):
"""Called when a new worker starts"""
worker_ctx.managed_resource_session = self._create_session()
def worker_teardown(self, worker_ctx):
"""Called when a worker finishes"""
if hasattr(worker_ctx, 'managed_resource_session'):
worker_ctx.managed_resource_session.close()
def get_dependency(self, worker_ctx):
"""Called for each service method invocation"""
return worker_ctx.managed_resource_sessionTesting utilities for mocking and replacing dependencies during testing.
from nameko.testing.services import worker_factory, replace_dependencies
class TestUserService:
def test_create_user_with_mock_db(self):
# Mock database dependency
mock_db = Mock()
mock_db.save_user.return_value = {'id': 123}
# Create service worker with mocked dependency
service = worker_factory(UserService, db=mock_db)
# Test service method
result = service.create_user({'email': 'test@example.com'})
assert result['id'] == 123
mock_db.save_user.assert_called_once()
def test_service_with_replaced_dependencies(self):
# Create container with real service
container = ServiceContainer(UserService, config={})
# Replace dependencies for testing
mock_cache = Mock()
replace_dependencies(container, cache=mock_cache)
# Test with replaced dependencies
container.start()
# ... test logic
container.stop()Dynamic dependency configuration based on environment and configuration.
class ConditionalDependency(DependencyProvider):
"""Dependency that changes behavior based on configuration"""
def setup(self):
config = self.container.config
if config.get('USE_REDIS_CACHE'):
self.backend = RedisCache(config['REDIS_URL'])
elif config.get('USE_MEMORY_CACHE'):
self.backend = MemoryCache(config.get('CACHE_SIZE', 1000))
else:
self.backend = NoOpCache()
def get_dependency(self, worker_ctx):
return self.backend
class FlexibleService:
name = "flexible_service"
# Dependency behavior changes based on config
cache = ConditionalDependency()
@rpc
def cache_value(self, key, value):
"""Caching works regardless of backend"""
self.cache.set(key, value)
return "cached"Proper error handling and recovery in dependency providers.
class ResilientDatabaseProvider(DependencyProvider):
"""Database provider with connection retry and recovery"""
def setup(self):
self.max_retries = 3
self.retry_delay = 1
self._connect_with_retry()
def _connect_with_retry(self):
for attempt in range(self.max_retries):
try:
config = self.container.config
self.connection = self._create_connection(config['DATABASE_URL'])
return
except Exception as e:
if attempt == self.max_retries - 1:
raise ConnectionError(f"Failed to connect after {self.max_retries} attempts")
time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff
def get_dependency(self, worker_ctx):
# Check connection health before returning
if not self._is_connection_healthy():
self._reconnect()
return self.connection
def _is_connection_healthy(self):
try:
self.connection.ping()
return True
except:
return False
def _reconnect(self):
try:
self.connection.close()
except:
pass
self._connect_with_retry()Install with Tessl CLI
npx tessl i tessl/pypi-nameko