A microservices framework for Python that lets service developers concentrate on application logic and encourages testability
—
Publish-subscribe event system for asynchronous inter-service communication with reliable message delivery, flexible routing patterns, and event-driven architecture support.
Decorator that subscribes service methods to specific events, enabling asynchronous event-driven processing.
def event_handler(source_service, event_type, **kwargs):
"""
Decorator to handle events from other services.
Parameters:
- source_service: Name of the service that publishes the event
- event_type: Type/name of the event to handle
- handler_type: Optional handler type ('broadcast' or 'service_pool')
- reliable_delivery: Whether to ensure reliable message delivery
- requeue_on_error: Whether to requeue messages on handler errors
Returns:
Decorated method that will be called when matching events are received
"""Usage Example:
from nameko.events import event_handler
class EmailService:
name = "email_service"
@event_handler('user_service', 'user_registered')
def send_welcome_email(self, payload):
user_email = payload['email']
user_name = payload['name']
# Send welcome email logic
self._send_email(user_email, 'Welcome!', f'Hello {user_name}')
@event_handler('order_service', 'order_completed',
handler_type='broadcast', reliable_delivery=True)
def send_order_confirmation(self, payload):
order_id = payload['order_id']
customer_email = payload['customer_email']
# Send order confirmation
self._send_email(customer_email, 'Order Confirmation',
f'Your order {order_id} is confirmed')Dependency provider that enables services to publish events to the event bus for consumption by event handlers.
class EventDispatcher:
"""
Dependency provider for dispatching events to other services.
"""
def __call__(self, event_type, event_data):
"""
Dispatch an event to the event bus.
Parameters:
- event_type: Type/name of the event
- event_data: Dictionary containing event payload data
"""Usage Example:
from nameko.rpc import rpc
from nameko.events import EventDispatcher
class UserService:
name = "user_service"
event_dispatcher = EventDispatcher()
@rpc
def create_user(self, user_data):
# Create user in database
user_id = self._save_user(user_data)
# Dispatch event to notify other services
self.event_dispatcher('user_created', {
'user_id': user_id,
'email': user_data['email'],
'name': user_data['name'],
'created_at': time.time()
})
return {'user_id': user_id, 'status': 'created'}
@rpc
def update_user_email(self, user_id, new_email):
# Update email in database
old_email = self._update_user_email(user_id, new_email)
# Dispatch event about email change
self.event_dispatcher('user_email_changed', {
'user_id': user_id,
'old_email': old_email,
'new_email': new_email,
'changed_at': time.time()
})
return {'status': 'updated'}Different handler types provide different delivery semantics for event processing.
Service Pool Handler (Default):
@event_handler('user_service', 'user_created', handler_type='service_pool')
def process_user_created(self, payload):
# Only one instance of this service will process this event
passBroadcast Handler:
@event_handler('user_service', 'user_created', handler_type='broadcast')
def invalidate_user_cache(self, payload):
# All instances will invalidate their user cache
user_id = payload['user_id']
self.cache.delete(f'user:{user_id}')Event handlers can be configured for reliable delivery to ensure events are not lost.
@event_handler('payment_service', 'payment_failed',
reliable_delivery=True, requeue_on_error=True)
def handle_payment_failure(self, payload):
"""
Parameters:
- reliable_delivery: Ensures message is acknowledged only after successful processing
- requeue_on_error: Requeues message if handler raises an exception
"""Event handlers can include additional filtering criteria beyond service and event type.
from nameko.events import event_handler, BROADCAST
class NotificationService:
name = "notification_service"
@event_handler('order_service', 'order_status_changed',
handler_type=BROADCAST)
def notify_status_change(self, payload):
# Filter based on event content
if payload.get('status') in ['shipped', 'delivered']:
customer_id = payload['customer_id']
self._send_notification(customer_id, payload)Events follow a standard message structure for consistency across services.
# Event message structure
{
"source_service": "user_service",
"event_type": "user_created",
"data": {
"user_id": 123,
"email": "user@example.com",
"name": "John Doe"
},
"metadata": {
"timestamp": "2023-01-01T12:00:00Z",
"correlation_id": "abc-123-def",
"event_id": "event-uuid-456"
}
}Failed event processing can be configured to route messages to dead letter queues for manual inspection.
# Configuration for dead letter handling
EVENT_HANDLER_CONFIG = {
'dead_letter_ttl': 86400000, # 24 hours in milliseconds
'max_delivery_attempts': 5,
'dead_letter_exchange': 'dlx'
}Best Practices:
class OptimizedEventHandler:
name = "optimized_service"
@event_handler('user_service', 'user_bulk_import',
handler_type='service_pool')
def process_bulk_import(self, payload):
# Process in batches for better performance
user_ids = payload['user_ids']
batch_size = 100
for i in range(0, len(user_ids), batch_size):
batch = user_ids[i:i + batch_size]
self._process_user_batch(batch)Install with Tessl CLI
npx tessl i tessl/pypi-nameko