CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nameko

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability

Pending
Overview
Eval results
Files

event-system.mddocs/

Event System

Publish-subscribe event system for asynchronous inter-service communication with reliable message delivery, flexible routing patterns, and event-driven architecture support.

Capabilities

Event Handler Decorator

Decorator that subscribes service methods to specific events, enabling asynchronous event-driven processing.

def event_handler(source_service, event_type, **kwargs):
    """
    Decorator to handle events from other services.
    
    Parameters:
    - source_service: Name of the service that publishes the event
    - event_type: Type/name of the event to handle
    - handler_type: Optional handler type ('broadcast' or 'service_pool')
    - reliable_delivery: Whether to ensure reliable message delivery
    - requeue_on_error: Whether to requeue messages on handler errors
    
    Returns:
    Decorated method that will be called when matching events are received
    """

Usage Example:

from nameko.events import event_handler

class EmailService:
    name = "email_service"
    
    @event_handler('user_service', 'user_registered')
    def send_welcome_email(self, payload):
        user_email = payload['email']
        user_name = payload['name']
        # Send welcome email logic
        self._send_email(user_email, 'Welcome!', f'Hello {user_name}')
    
    @event_handler('order_service', 'order_completed', 
                   handler_type='broadcast', reliable_delivery=True)
    def send_order_confirmation(self, payload):
        order_id = payload['order_id']
        customer_email = payload['customer_email']
        # Send order confirmation
        self._send_email(customer_email, 'Order Confirmation', 
                        f'Your order {order_id} is confirmed')

Event Dispatcher

Dependency provider that enables services to publish events to the event bus for consumption by event handlers.

class EventDispatcher:
    """
    Dependency provider for dispatching events to other services.
    """
    
    def __call__(self, event_type, event_data):
        """
        Dispatch an event to the event bus.
        
        Parameters:
        - event_type: Type/name of the event
        - event_data: Dictionary containing event payload data
        """

Usage Example:

from nameko.rpc import rpc
from nameko.events import EventDispatcher

class UserService:
    name = "user_service"
    
    event_dispatcher = EventDispatcher()
    
    @rpc
    def create_user(self, user_data):
        # Create user in database
        user_id = self._save_user(user_data)
        
        # Dispatch event to notify other services
        self.event_dispatcher('user_created', {
            'user_id': user_id,
            'email': user_data['email'],
            'name': user_data['name'],
            'created_at': time.time()
        })
        
        return {'user_id': user_id, 'status': 'created'}
    
    @rpc
    def update_user_email(self, user_id, new_email):
        # Update email in database
        old_email = self._update_user_email(user_id, new_email)
        
        # Dispatch event about email change
        self.event_dispatcher('user_email_changed', {
            'user_id': user_id,
            'old_email': old_email,
            'new_email': new_email,
            'changed_at': time.time()
        })
        
        return {'status': 'updated'}

Event Handler Types

Different handler types provide different delivery semantics for event processing.

Service Pool Handler (Default):

  • Events are load-balanced across instances of the same service
  • Only one instance processes each event
  • Good for work distribution
@event_handler('user_service', 'user_created', handler_type='service_pool')
def process_user_created(self, payload):
    # Only one instance of this service will process this event
    pass

Broadcast Handler:

  • Events are delivered to all instances of the service
  • Every instance processes the event
  • Good for cache invalidation, logging
@event_handler('user_service', 'user_created', handler_type='broadcast')
def invalidate_user_cache(self, payload):
    # All instances will invalidate their user cache
    user_id = payload['user_id']
    self.cache.delete(f'user:{user_id}')

Reliable Delivery

Event handlers can be configured for reliable delivery to ensure events are not lost.

@event_handler('payment_service', 'payment_failed', 
               reliable_delivery=True, requeue_on_error=True)
def handle_payment_failure(self, payload):
    """
    Parameters:
    - reliable_delivery: Ensures message is acknowledged only after successful processing
    - requeue_on_error: Requeues message if handler raises an exception
    """

Event Filtering

Event handlers can include additional filtering criteria beyond service and event type.

from nameko.events import event_handler, BROADCAST

class NotificationService:
    name = "notification_service"
    
    @event_handler('order_service', 'order_status_changed',
                   handler_type=BROADCAST)
    def notify_status_change(self, payload):
        # Filter based on event content
        if payload.get('status') in ['shipped', 'delivered']:
            customer_id = payload['customer_id']
            self._send_notification(customer_id, payload)

Event Message Structure

Events follow a standard message structure for consistency across services.

# Event message structure
{
    "source_service": "user_service",
    "event_type": "user_created", 
    "data": {
        "user_id": 123,
        "email": "user@example.com",
        "name": "John Doe"
    },
    "metadata": {
        "timestamp": "2023-01-01T12:00:00Z",
        "correlation_id": "abc-123-def",
        "event_id": "event-uuid-456"
    }
}

Dead Letter Handling

Failed event processing can be configured to route messages to dead letter queues for manual inspection.

# Configuration for dead letter handling
EVENT_HANDLER_CONFIG = {
    'dead_letter_ttl': 86400000,  # 24 hours in milliseconds
    'max_delivery_attempts': 5,
    'dead_letter_exchange': 'dlx'
}

Performance Considerations

  • Event Ordering: Events are not guaranteed to be processed in order unless using single consumer
  • Event Size: Keep event payloads small; include only necessary data
  • Handler Performance: Long-running handlers should be avoided or processed asynchronously
  • Queue Management: Monitor queue depths to prevent memory issues

Best Practices:

class OptimizedEventHandler:
    name = "optimized_service"
    
    @event_handler('user_service', 'user_bulk_import', 
                   handler_type='service_pool')
    def process_bulk_import(self, payload):
        # Process in batches for better performance
        user_ids = payload['user_ids']
        batch_size = 100
        
        for i in range(0, len(user_ids), batch_size):
            batch = user_ids[i:i + batch_size]
            self._process_user_batch(batch)

Install with Tessl CLI

npx tessl i tessl/pypi-nameko

docs

cli-interface.md

dependency-injection.md

event-system.md

http-interface.md

index.md

rpc-communication.md

service-management.md

standalone-clients.md

testing-framework.md

timer-scheduling.md

tile.json