CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nameko

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability

Pending
Overview
Eval results
Files

service-management.mddocs/

Service Management

Tools for running services in development and production environments with process management, configuration handling, graceful shutdown, and monitoring capabilities.

Capabilities

ServiceRunner

Class for programmatically running nameko services with full control over lifecycle, configuration, and worker management.

class ServiceRunner:
    """
    Programmatic service runner for managing nameko services.
    
    Parameters:
    - config: Configuration dictionary or path to config file
    """
    
    def __init__(self, config=None): ...
    
    def add_service(self, service_cls):
        """Add a service class to be run by this runner"""
    
    def start(self):
        """Start all registered services"""
    
    def stop(self):
        """Stop all services gracefully"""
    
    def wait(self):
        """Wait for all services to stop"""
    
    def kill(self):
        """Force kill all services"""

Usage Example:

from nameko.runners import ServiceRunner

# Define your services
class UserService:
    name = "user_service"
    # ... service implementation

class OrderService:
    name = "order_service" 
    # ... service implementation

# Create and configure runner
config = {
    'AMQP_URI': 'amqp://guest:guest@localhost:5672//',
    'WEB_SERVER_ADDRESS': '0.0.0.0:8000',
    'MAX_WORKERS': 10
}

runner = ServiceRunner(config)
runner.add_service(UserService)
runner.add_service(OrderService)

# Start services
try:
    runner.start()
    runner.wait()  # Block until services stop
except KeyboardInterrupt:
    runner.stop()  # Graceful shutdown
    runner.wait()

run_services Function

Convenience function for quickly running multiple services with shared configuration.

def run_services(services, config=None):
    """
    Run multiple services with shared configuration.
    
    Parameters:
    - services: List of service classes to run
    - config: Configuration dictionary, file path, or None for defaults
    
    This function blocks until services are stopped (Ctrl+C or signal).
    """

Usage Example:

from nameko.runners import run_services

# Simple service startup
if __name__ == '__main__':
    services = [UserService, OrderService, NotificationService]
    
    # Run with default configuration
    run_services(services)
    
    # Or with custom config
    config = {
        'AMQP_URI': 'amqp://user:pass@rabbitmq:5672//',
        'MAX_WORKERS': 20
    }
    run_services(services, config)

Configuration Management

Comprehensive configuration system supporting environment variables, YAML files, and programmatic configuration.

Configuration Sources:

# 1. Dictionary configuration
config = {
    'AMQP_URI': 'amqp://localhost:5672//',
    'WEB_SERVER_ADDRESS': '0.0.0.0:8000',
    'MAX_WORKERS': 10,
    'SERIALIZER': 'json'
}

# 2. YAML file configuration
# config.yaml
"""
AMQP_URI: 'amqp://localhost:5672//'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
MAX_WORKERS: 10
RPC_TIMEOUT: 30

DATABASE:
  host: localhost
  port: 5432
  name: myapp
"""

# 3. Environment variables (automatically loaded)
# NAMEKO_AMQP_URI=amqp://localhost:5672//
# NAMEKO_MAX_WORKERS=10

Configuration Access in Services:

from nameko.dependency_providers import Config

class ConfigurableService:
    name = "configurable_service"
    
    config = Config()
    
    @rpc
    def get_database_config(self):
        # Access nested configuration
        db_host = self.config['DATABASE']['host']
        db_port = self.config['DATABASE']['port']
        
        return {
            'host': db_host,
            'port': db_port,
            'max_workers': self.config['MAX_WORKERS']
        }
    
    @rpc  
    def get_api_key(self):
        # Access with defaults
        api_key = self.config.get('API_KEY', 'default-key')
        return api_key

Process Management

Advanced process management with worker pools, graceful shutdown, and signal handling.

Worker Pool Configuration:

# Automatic worker scaling based on load
config = {
    'MAX_WORKERS': 50,           # Maximum workers per service
    'WORKER_POOL_SIZE': 10,      # Initial worker pool size
    'WORKER_THREADS': 1000,      # Thread pool size per worker
    'HEARTBEAT_INTERVAL': 30,    # Worker heartbeat interval
    'WORKER_TIMEOUT': 300        # Worker timeout in seconds
}

class ScalableService:
    name = "scalable_service"
    
    @rpc
    def cpu_intensive_task(self, data):
        # This will be distributed across worker pool
        return self._process_data(data)

Graceful Shutdown Handling:

import signal
from nameko.runners import ServiceRunner

class ManagedServiceRunner:
    def __init__(self, services, config):
        self.runner = ServiceRunner(config)
        for service in services:
            self.runner.add_service(service)
        
        # Setup signal handlers for graceful shutdown
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)
    
    def _handle_shutdown(self, signum, frame):
        print(f"Received signal {signum}, shutting down gracefully...")
        self.runner.stop()
    
    def run(self):
        try:
            self.runner.start()
            self.runner.wait()
        except Exception as e:
            print(f"Error running services: {e}")
            self.runner.kill()  # Force shutdown on error

Health Checks and Monitoring

Built-in health check endpoints and monitoring capabilities.

# Health check service (automatically registered)
class HealthCheckService:
    name = "health_check"
    
    @http('GET', '/health')
    def health_check(self, request):
        """Standard health check endpoint"""
        return {
            'status': 'healthy',
            'timestamp': time.time(),
            'services': self._get_service_status()
        }
    
    @http('GET', '/metrics')
    def metrics(self, request):
        """Service metrics endpoint"""
        return {
            'active_workers': self._get_worker_count(),
            'processed_requests': self._get_request_count(),
            'memory_usage': self._get_memory_usage(),
            'uptime': self._get_uptime()
        }

Development vs Production Configuration

Different configuration patterns for development and production environments.

Development Configuration:

# development.yaml
AMQP_URI: 'amqp://guest:guest@localhost:5672//'
WEB_SERVER_ADDRESS: '127.0.0.1:8000'
MAX_WORKERS: 1  # Single worker for debugging
DEBUG: true
LOG_LEVEL: 'DEBUG'
RELOAD_ON_CHANGE: true  # Auto-reload on code changes

Production Configuration:

# production.yaml  
AMQP_URI: 'amqp://user:password@rabbitmq-cluster:5672//'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
MAX_WORKERS: 50
DEBUG: false
LOG_LEVEL: 'INFO'

# Production-specific settings
WORKER_POOL_SIZE: 20
CONNECTION_POOL_SIZE: 50
RPC_TIMEOUT: 60
HEARTBEAT_INTERVAL: 60

# Security settings
SSL_ENABLED: true
AUTH_REQUIRED: true

Command Line Interface

Built-in CLI tools for service management and development.

# Run services from command line
nameko run myapp.services

# Run with specific config
nameko run myapp.services --config production.yaml

# Run in development mode with auto-reload
nameko run myapp.services --development

# Shell for testing services
nameko shell --config config.yaml

# Show service information
nameko show myapp.services

Docker and Container Integration

Configuration patterns for containerized deployments.

Dockerfile Example:

FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .

# Use environment variables for configuration
ENV NAMEKO_AMQP_URI=amqp://rabbitmq:5672//
ENV NAMEKO_MAX_WORKERS=10

CMD ["nameko", "run", "myapp.services"]

Docker Compose Example:

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
  
  user-service:
    build: .
    environment:
      - NAMEKO_AMQP_URI=amqp://rabbitmq:5672//
      - NAMEKO_WEB_SERVER_ADDRESS=0.0.0.0:8000
    ports:
      - "8000:8000"
    depends_on:
      - rabbitmq
    command: nameko run services.user_service
  
  order-service:
    build: .
    environment:
      - NAMEKO_AMQP_URI=amqp://rabbitmq:5672//
    depends_on:
      - rabbitmq
    command: nameko run services.order_service

Install with Tessl CLI

npx tessl i tessl/pypi-nameko

docs

cli-interface.md

dependency-injection.md

event-system.md

http-interface.md

index.md

rpc-communication.md

service-management.md

standalone-clients.md

testing-framework.md

timer-scheduling.md

tile.json