CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nameko

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability

Pending
Overview
Eval results
Files

dependency-injection.mddocs/

Dependency Injection

System for providing services with access to external resources like databases, configuration, caches, and other services through clean dependency injection patterns and lifecycle management.

Capabilities

Config Dependency Provider

Provides access to service configuration with support for nested values, environment variables, and default fallbacks.

class Config:
    """
    Dependency provider for accessing configuration values.
    
    Parameters:
    - key: Optional specific configuration key to bind to
    """
    
    def __init__(self, key=None): ...

Usage Example:

from nameko.dependency_providers import Config

class DatabaseService:
    name = "database_service"
    
    # Inject entire config
    config = Config()
    
    # Inject specific config key
    db_config = Config('DATABASE')
    api_key = Config('API_KEY')
    
    @rpc
    def connect_to_database(self):
        # Access nested configuration
        host = self.db_config['host']
        port = self.db_config['port']
        username = self.db_config['username']
        password = self.db_config['password']
        
        connection_string = f"postgresql://{username}:{password}@{host}:{port}"
        return self._connect(connection_string)
    
    @rpc
    def get_api_settings(self):
        # Access configuration with defaults
        timeout = self.config.get('API_TIMEOUT', 30)
        retries = self.config.get('API_RETRIES', 3)
        
        return {
            'api_key': self.api_key,
            'timeout': timeout,
            'retries': retries
        }

Context Data Providers

Built-in dependency providers for accessing request context data like user information, authentication tokens, and request metadata.

class Language:
    """
    Dependency provider for accessing request language context.
    """
    
    def __init__(self): ...

class UserId:
    """
    Dependency provider for accessing user ID from request context.
    """
    
    def __init__(self): ...

class UserAgent:
    """
    Dependency provider for accessing user agent from request context.
    """
    
    def __init__(self): ...

class AuthToken:
    """
    Dependency provider for accessing authentication token from request context.
    """
    
    def __init__(self): ...

Usage Example:

from nameko.contextdata import Language, UserId, UserAgent, AuthToken
from nameko.rpc import rpc

class UserService:
    name = "user_service"
    
    # Context data providers
    language = Language()
    user_id = UserId()
    user_agent = UserAgent()
    auth_token = AuthToken()
    
    @rpc
    def get_user_preferences(self):
        """Get user preferences with context awareness"""
        current_user_id = self.user_id
        preferred_language = self.language
        client_info = self.user_agent
        token = self.auth_token
        
        return {
            'user_id': current_user_id,
            'language': preferred_language,
            'client': client_info,
            'authenticated': bool(token)
        }
    
    @rpc
    def log_user_action(self, action):
        """Log user action with full context"""
        context = {
            'user_id': self.user_id,
            'language': self.language,
            'user_agent': self.user_agent,
            'action': action,
            'timestamp': time.time()
        }
        
        # Log with context information
        self._log_action(context)
        return {'status': 'logged'}

Custom Dependency Providers

Create custom dependency providers for databases, caches, external APIs, and other resources.

from nameko.extensions import DependencyProvider

class DatabaseProvider(DependencyProvider):
    """
    Custom dependency provider for database connections.
    """
    
    def setup(self):
        """Called when the service container is started"""
        self.connection_pool = self._create_connection_pool()
    
    def stop(self):
        """Called when the service container is stopped"""
        self.connection_pool.close()
    
    def get_dependency(self, worker_ctx):
        """Called for each service method invocation"""
        return self.connection_pool.get_connection()

Usage Example:

import redis
from nameko.extensions import DependencyProvider

class RedisProvider(DependencyProvider):
    """Redis connection provider with connection pooling"""
    
    def setup(self):
        config = self.container.config
        redis_url = config.get('REDIS_URL', 'redis://localhost:6379/0')
        self.connection_pool = redis.ConnectionPool.from_url(redis_url)
    
    def stop(self):
        self.connection_pool.disconnect()
    
    def get_dependency(self, worker_ctx):
        return redis.Redis(connection_pool=self.connection_pool)

class CacheService:
    name = "cache_service"
    
    # Custom dependency injection
    redis = RedisProvider()
    config = Config()
    
    @rpc
    def get_cached_value(self, key):
        """Get value from Redis cache"""
        value = self.redis.get(key)
        return value.decode('utf-8') if value else None
    
    @rpc
    def set_cached_value(self, key, value, ttl=None):
        """Set value in Redis cache with optional TTL"""
        ttl = ttl or self.config.get('DEFAULT_CACHE_TTL', 3600)
        self.redis.setex(key, ttl, value)
        return True
    
    @rpc
    def clear_cache_pattern(self, pattern):
        """Clear cache entries matching pattern"""
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)
        return len(keys)

Database Integration

Common patterns for database integration with connection management and transaction handling.

import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker
from nameko.extensions import DependencyProvider

class DatabaseSession(DependencyProvider):
    """SQLAlchemy database session provider"""
    
    def setup(self):
        config = self.container.config
        db_url = config['DATABASE_URL']
        
        self.engine = sa.create_engine(db_url, pool_pre_ping=True)
        self.Session = sessionmaker(bind=self.engine)
    
    def stop(self):
        self.engine.dispose()
    
    def get_dependency(self, worker_ctx):
        return self.Session()

class UserService:
    name = "user_service"
    
    db = DatabaseSession()
    
    @rpc
    def create_user(self, user_data):
        """Create user with automatic session management"""
        session = self.db
        try:
            user = User(**user_data)
            session.add(user)
            session.commit()
            return {'id': user.id, 'email': user.email}
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()
    
    @rpc
    def get_users_by_status(self, status):
        """Query users with session cleanup"""
        session = self.db
        try:
            users = session.query(User).filter(User.status == status).all()
            return [{'id': u.id, 'email': u.email} for u in users]
        finally:
            session.close()

Shared Dependencies

Dependencies that are shared across multiple services for resource efficiency.

from nameko.extensions import SharedExtension

class SharedCache(SharedExtension):
    """
    Shared cache instance across multiple services.
    Only one instance is created per service container.
    """
    
    def setup(self):
        self.cache = {}
        self.max_size = 1000
    
    def get(self, key):
        return self.cache.get(key)
    
    def set(self, key, value):
        if len(self.cache) >= self.max_size:
            # Simple LRU eviction
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
        self.cache[key] = value

Usage Example:

class UserService:
    name = "user_service"
    
    cache = SharedCache()
    
    @rpc
    def get_user(self, user_id):
        # Check cache first
        cached_user = self.cache.get(f'user:{user_id}')
        if cached_user:
            return cached_user
        
        # Fetch from database
        user = self._fetch_user_from_db(user_id)
        
        # Cache the result
        self.cache.set(f'user:{user_id}', user)
        return user

class OrderService:
    name = "order_service"
    
    # Same shared cache instance
    cache = SharedCache()
    
    @rpc
    def invalidate_user_cache(self, user_id):
        # Invalidate cached user data
        cache_key = f'user:{user_id}'
        if self.cache.get(cache_key):
            del self.cache.cache[cache_key]

Lifecycle Management

Dependency providers support full lifecycle management with setup, teardown, and worker context handling.

class ManagedResource(DependencyProvider):
    """Example of full lifecycle management"""
    
    def setup(self):
        """Called once when service container starts"""
        print("Setting up managed resource...")
        self.resource = self._initialize_resource()
        self.health_checker = self._start_health_checker()
    
    def stop(self):
        """Called once when service container stops"""
        print("Stopping managed resource...")
        self.health_checker.stop()
        self.resource.close()
    
    def worker_setup(self, worker_ctx):
        """Called when a new worker starts"""
        worker_ctx.managed_resource_session = self._create_session()
    
    def worker_teardown(self, worker_ctx):
        """Called when a worker finishes"""
        if hasattr(worker_ctx, 'managed_resource_session'):
            worker_ctx.managed_resource_session.close()
    
    def get_dependency(self, worker_ctx):
        """Called for each service method invocation"""
        return worker_ctx.managed_resource_session

Dependency Testing

Testing utilities for mocking and replacing dependencies during testing.

from nameko.testing.services import worker_factory, replace_dependencies

class TestUserService:
    
    def test_create_user_with_mock_db(self):
        # Mock database dependency
        mock_db = Mock()
        mock_db.save_user.return_value = {'id': 123}
        
        # Create service worker with mocked dependency
        service = worker_factory(UserService, db=mock_db)
        
        # Test service method
        result = service.create_user({'email': 'test@example.com'})
        
        assert result['id'] == 123
        mock_db.save_user.assert_called_once()
    
    def test_service_with_replaced_dependencies(self):
        # Create container with real service
        container = ServiceContainer(UserService, config={})
        
        # Replace dependencies for testing
        mock_cache = Mock()
        replace_dependencies(container, cache=mock_cache)
        
        # Test with replaced dependencies
        container.start()
        # ... test logic
        container.stop()

Configuration-Based Dependencies

Dynamic dependency configuration based on environment and configuration.

class ConditionalDependency(DependencyProvider):
    """Dependency that changes behavior based on configuration"""
    
    def setup(self):
        config = self.container.config
        
        if config.get('USE_REDIS_CACHE'):
            self.backend = RedisCache(config['REDIS_URL'])
        elif config.get('USE_MEMORY_CACHE'):
            self.backend = MemoryCache(config.get('CACHE_SIZE', 1000))
        else:
            self.backend = NoOpCache()
    
    def get_dependency(self, worker_ctx):
        return self.backend

class FlexibleService:
    name = "flexible_service"
    
    # Dependency behavior changes based on config
    cache = ConditionalDependency()
    
    @rpc
    def cache_value(self, key, value):
        """Caching works regardless of backend"""
        self.cache.set(key, value)
        return "cached"

Error Handling in Dependencies

Proper error handling and recovery in dependency providers.

class ResilientDatabaseProvider(DependencyProvider):
    """Database provider with connection retry and recovery"""
    
    def setup(self):
        self.max_retries = 3
        self.retry_delay = 1
        self._connect_with_retry()
    
    def _connect_with_retry(self):
        for attempt in range(self.max_retries):
            try:
                config = self.container.config
                self.connection = self._create_connection(config['DATABASE_URL'])
                return
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise ConnectionError(f"Failed to connect after {self.max_retries} attempts")
                time.sleep(self.retry_delay * (2 ** attempt))  # Exponential backoff
    
    def get_dependency(self, worker_ctx):
        # Check connection health before returning
        if not self._is_connection_healthy():
            self._reconnect()
        return self.connection
    
    def _is_connection_healthy(self):
        try:
            self.connection.ping()
            return True
        except:
            return False
    
    def _reconnect(self):
        try:
            self.connection.close()
        except:
            pass
        self._connect_with_retry()

Install with Tessl CLI

npx tessl i tessl/pypi-nameko

docs

cli-interface.md

dependency-injection.md

event-system.md

http-interface.md

index.md

rpc-communication.md

service-management.md

standalone-clients.md

testing-framework.md

timer-scheduling.md

tile.json