A microservices framework for Python that lets service developers concentrate on application logic and encourages testability
—
Tools for running services in development and production environments with process management, configuration handling, graceful shutdown, and monitoring capabilities.
Class for programmatically running nameko services with full control over lifecycle, configuration, and worker management.
class ServiceRunner:
"""
Programmatic service runner for managing nameko services.
Parameters:
- config: Configuration dictionary or path to config file
"""
def __init__(self, config=None): ...
def add_service(self, service_cls):
"""Add a service class to be run by this runner"""
def start(self):
"""Start all registered services"""
def stop(self):
"""Stop all services gracefully"""
def wait(self):
"""Wait for all services to stop"""
def kill(self):
"""Force kill all services"""Usage Example:
from nameko.runners import ServiceRunner
# Define your services
class UserService:
name = "user_service"
# ... service implementation
class OrderService:
name = "order_service"
# ... service implementation
# Create and configure runner
config = {
'AMQP_URI': 'amqp://guest:guest@localhost:5672//',
'WEB_SERVER_ADDRESS': '0.0.0.0:8000',
'MAX_WORKERS': 10
}
runner = ServiceRunner(config)
runner.add_service(UserService)
runner.add_service(OrderService)
# Start services
try:
runner.start()
runner.wait() # Block until services stop
except KeyboardInterrupt:
runner.stop() # Graceful shutdown
runner.wait()Convenience function for quickly running multiple services with shared configuration.
def run_services(services, config=None):
"""
Run multiple services with shared configuration.
Parameters:
- services: List of service classes to run
- config: Configuration dictionary, file path, or None for defaults
This function blocks until services are stopped (Ctrl+C or signal).
"""Usage Example:
from nameko.runners import run_services
# Simple service startup
if __name__ == '__main__':
services = [UserService, OrderService, NotificationService]
# Run with default configuration
run_services(services)
# Or with custom config
config = {
'AMQP_URI': 'amqp://user:pass@rabbitmq:5672//',
'MAX_WORKERS': 20
}
run_services(services, config)Comprehensive configuration system supporting environment variables, YAML files, and programmatic configuration.
Configuration Sources:
# 1. Dictionary configuration
config = {
'AMQP_URI': 'amqp://localhost:5672//',
'WEB_SERVER_ADDRESS': '0.0.0.0:8000',
'MAX_WORKERS': 10,
'SERIALIZER': 'json'
}
# 2. YAML file configuration
# config.yaml
"""
AMQP_URI: 'amqp://localhost:5672//'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
MAX_WORKERS: 10
RPC_TIMEOUT: 30
DATABASE:
host: localhost
port: 5432
name: myapp
"""
# 3. Environment variables (automatically loaded)
# NAMEKO_AMQP_URI=amqp://localhost:5672//
# NAMEKO_MAX_WORKERS=10Configuration Access in Services:
from nameko.dependency_providers import Config
class ConfigurableService:
name = "configurable_service"
config = Config()
@rpc
def get_database_config(self):
# Access nested configuration
db_host = self.config['DATABASE']['host']
db_port = self.config['DATABASE']['port']
return {
'host': db_host,
'port': db_port,
'max_workers': self.config['MAX_WORKERS']
}
@rpc
def get_api_key(self):
# Access with defaults
api_key = self.config.get('API_KEY', 'default-key')
return api_keyAdvanced process management with worker pools, graceful shutdown, and signal handling.
Worker Pool Configuration:
# Automatic worker scaling based on load
config = {
'MAX_WORKERS': 50, # Maximum workers per service
'WORKER_POOL_SIZE': 10, # Initial worker pool size
'WORKER_THREADS': 1000, # Thread pool size per worker
'HEARTBEAT_INTERVAL': 30, # Worker heartbeat interval
'WORKER_TIMEOUT': 300 # Worker timeout in seconds
}
class ScalableService:
name = "scalable_service"
@rpc
def cpu_intensive_task(self, data):
# This will be distributed across worker pool
return self._process_data(data)Graceful Shutdown Handling:
import signal
from nameko.runners import ServiceRunner
class ManagedServiceRunner:
def __init__(self, services, config):
self.runner = ServiceRunner(config)
for service in services:
self.runner.add_service(service)
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
def _handle_shutdown(self, signum, frame):
print(f"Received signal {signum}, shutting down gracefully...")
self.runner.stop()
def run(self):
try:
self.runner.start()
self.runner.wait()
except Exception as e:
print(f"Error running services: {e}")
self.runner.kill() # Force shutdown on errorBuilt-in health check endpoints and monitoring capabilities.
# Health check service (automatically registered)
class HealthCheckService:
name = "health_check"
@http('GET', '/health')
def health_check(self, request):
"""Standard health check endpoint"""
return {
'status': 'healthy',
'timestamp': time.time(),
'services': self._get_service_status()
}
@http('GET', '/metrics')
def metrics(self, request):
"""Service metrics endpoint"""
return {
'active_workers': self._get_worker_count(),
'processed_requests': self._get_request_count(),
'memory_usage': self._get_memory_usage(),
'uptime': self._get_uptime()
}Different configuration patterns for development and production environments.
Development Configuration:
# development.yaml
AMQP_URI: 'amqp://guest:guest@localhost:5672//'
WEB_SERVER_ADDRESS: '127.0.0.1:8000'
MAX_WORKERS: 1 # Single worker for debugging
DEBUG: true
LOG_LEVEL: 'DEBUG'
RELOAD_ON_CHANGE: true # Auto-reload on code changesProduction Configuration:
# production.yaml
AMQP_URI: 'amqp://user:password@rabbitmq-cluster:5672//'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
MAX_WORKERS: 50
DEBUG: false
LOG_LEVEL: 'INFO'
# Production-specific settings
WORKER_POOL_SIZE: 20
CONNECTION_POOL_SIZE: 50
RPC_TIMEOUT: 60
HEARTBEAT_INTERVAL: 60
# Security settings
SSL_ENABLED: true
AUTH_REQUIRED: trueBuilt-in CLI tools for service management and development.
# Run services from command line
nameko run myapp.services
# Run with specific config
nameko run myapp.services --config production.yaml
# Run in development mode with auto-reload
nameko run myapp.services --development
# Shell for testing services
nameko shell --config config.yaml
# Show service information
nameko show myapp.servicesConfiguration patterns for containerized deployments.
Dockerfile Example:
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Use environment variables for configuration
ENV NAMEKO_AMQP_URI=amqp://rabbitmq:5672//
ENV NAMEKO_MAX_WORKERS=10
CMD ["nameko", "run", "myapp.services"]Docker Compose Example:
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
user-service:
build: .
environment:
- NAMEKO_AMQP_URI=amqp://rabbitmq:5672//
- NAMEKO_WEB_SERVER_ADDRESS=0.0.0.0:8000
ports:
- "8000:8000"
depends_on:
- rabbitmq
command: nameko run services.user_service
order-service:
build: .
environment:
- NAMEKO_AMQP_URI=amqp://rabbitmq:5672//
depends_on:
- rabbitmq
command: nameko run services.order_serviceInstall with Tessl CLI
npx tessl i tessl/pypi-nameko