CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-botocore

Low-level, data-driven core of boto 3 providing foundational AWS service access.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

events.mddocs/

Event System

Extensible event system for hooking into request/response lifecycle, enabling custom authentication, logging, monitoring, and request modification. The event system provides a hierarchical emitter that supports wildcard matching and event aliasing for maximum flexibility in customizing AWS API interactions.

Capabilities

Hierarchical Event Emitter

Core event emission system with hierarchical event names and wildcard support.

class HierarchicalEmitter:
    def __init__(self):
        """
        Initialize hierarchical event emitter.
        
        Provides event registration, emission, and handler management
        with support for hierarchical event names and wildcard matching.
        """
    
    def emit(self, event_name: str, **kwargs) -> List[Tuple[callable, Any]]:
        """
        Emit an event by name with arguments passed as keyword args.
        
        Args:
            event_name: Dot-separated event name (e.g., 'before-call.s3.GetObject')
            **kwargs: Arguments to pass to event handlers
            
        Returns:
            List of (handler, response) tuples from all processed handlers
            
        Example:
            >>> responses = emitter.emit(
            ...     'my-event.service.operation', arg1='one', arg2='two')
        """
    
    def emit_until_response(
        self, 
        event_name: str, 
        **kwargs
    ) -> Tuple[callable, Any]:
        """
        Emit event until first non-None response is received.
        
        Prevents subsequent handlers from being invoked after receiving
        a non-None response, useful for short-circuiting event processing.
        
        Args:
            event_name: Dot-separated event name
            **kwargs: Arguments to pass to event handlers
            
        Returns:
            First (handler, response) tuple with non-None response,
            otherwise (None, None)
            
        Example:
            >>> handler, response = emitter.emit_until_response(
            ...     'my-event.service.operation', arg1='one', arg2='two')
        """
    
    def register(
        self,
        event_name: str,
        handler: callable,
        unique_id: str = None,
        unique_id_uses_count: bool = False
    ) -> None:
        """
        Register an event handler for a given event.
        
        Handlers are called in order: register_first() → register() → register_last()
        
        Args:
            event_name: Event name to register handler for
            handler: Callable event handler that accepts **kwargs
            unique_id: Unique identifier to prevent duplicate registrations
            unique_id_uses_count: Whether unique_id uses reference counting
            
        Raises:
            ValueError: If handler is not callable or doesn't accept **kwargs
        """
    
    def register_first(
        self,
        event_name: str,
        handler: callable,
        unique_id: str = None,
        unique_id_uses_count: bool = False
    ) -> None:
        """
        Register event handler to be called first for an event.
        
        All handlers registered with register_first() are called before
        handlers registered with register() and register_last().
        
        Args:
            event_name: Event name to register handler for
            handler: Callable event handler that accepts **kwargs
            unique_id: Unique identifier to prevent duplicate registrations
            unique_id_uses_count: Whether unique_id uses reference counting
        """
    
    def register_last(
        self,
        event_name: str,
        handler: callable,
        unique_id: str = None,
        unique_id_uses_count: bool = False
    ) -> None:
        """
        Register event handler to be called last for an event.
        
        All handlers registered with register_last() are called after
        handlers registered with register_first() and register().
        
        Args:
            event_name: Event name to register handler for
            handler: Callable event handler that accepts **kwargs
            unique_id: Unique identifier to prevent duplicate registrations
            unique_id_uses_count: Whether unique_id uses reference counting
        """
    
    def unregister(
        self,
        event_name: str,
        handler: callable = None,
        unique_id: str = None,
        unique_id_uses_count: bool = False
    ) -> None:
        """
        Unregister an event handler for a given event.
        
        Args:
            event_name: Event name to unregister handler from
            handler: Handler to unregister (if no unique_id specified)
            unique_id: Unique identifier of handler to unregister
            unique_id_uses_count: Whether unique_id uses reference counting
        """

Event Aliasing

Event name aliasing system for backward compatibility and event name transformation.

class EventAliaser:
    def __init__(
        self,
        event_emitter: HierarchicalEmitter,
        event_aliases: dict = None
    ):
        """
        Initialize event aliaser with underlying emitter and alias mappings.
        
        Args:
            event_emitter: Underlying hierarchical emitter
            event_aliases: Mapping of old event names to new event names
        """
    
    def emit(self, event_name: str, **kwargs) -> List[Tuple[callable, Any]]:
        """Emit event with automatic name aliasing."""
    
    def emit_until_response(
        self, 
        event_name: str, 
        **kwargs
    ) -> Tuple[callable, Any]:
        """Emit event until response with automatic name aliasing."""
    
    def register(
        self,
        event_name: str,
        handler: callable,
        unique_id: str = None,
        unique_id_uses_count: bool = False
    ) -> None:
        """Register handler with automatic event name aliasing."""
    
    def unregister(
        self,
        event_name: str,
        handler: callable = None,
        unique_id: str = None,
        unique_id_uses_count: bool = False
    ) -> None:
        """Unregister handler with automatic event name aliasing."""

Utility Functions

Helper functions for working with event responses.

def first_non_none_response(
    responses: List[Tuple[callable, Any]], 
    default: Any = None
) -> Any:
    """
    Find first non-None response in a list of handler response tuples.
    
    Args:
        responses: List of (handler, response) tuples from emit()
        default: Default value if no non-None responses found
        
    Returns:
        First non-None response or default value
        
    Example:
        >>> responses = [(func1, None), (func2, 'foo'), (func3, 'bar')]
        >>> first_non_none_response(responses)
        'foo'
    """

Event Lifecycle

AWS Client Request Lifecycle

The botocore event system provides hooks throughout the AWS API request lifecycle:

Event Flow Order:

  1. provide-client-params.*.* - Provide additional client parameters
  2. before-parameter-build.*.* - Modify parameters before request building
  3. before-call.*.* - Modify request before sending
  4. before-sign.*.* - Modify request before signing
  5. response-received.*.* - Process response after receiving
  6. after-call.*.* - Process final response
  7. needs-retry.*.* - Determine if request should be retried

Common Event Types

Parameter Building Events:

  • before-parameter-build.{service}.{operation} - Modify operation parameters
  • provide-client-params.{service}.{operation} - Add client-level parameters

Request Processing Events:

  • before-call.{service}.{operation} - Modify request before sending
  • before-sign.{service}.{operation} - Modify request before signing

Response Processing Events:

  • response-received.{service}.{operation} - Process HTTP response
  • after-call.{service}.{operation} - Process parsed response

Retry Events:

  • needs-retry.{service}.{operation} - Determine retry behavior

Global Events:

  • before-call.*.* - Called for all service operations
  • after-call.*.* - Called for all service operations
  • needs-retry.*.* - Called for all retry decisions

Usage Examples

Basic Event Registration

from botocore.session import get_session

# Create session and client
session = get_session()
client = session.create_client('s3', region_name='us-east-1')

# Access the event system
events = client.meta.events

# Register a simple event handler
def log_request(**kwargs):
    print(f"Making request: {kwargs.get('event_name')}")

events.register('before-call.s3.*', log_request)

# Make a request - handler will be called
response = client.list_buckets()

Custom Authentication Handler

def custom_auth_handler(request, **kwargs):
    """Add custom authentication headers to requests."""
    request.headers['X-Custom-Auth'] = 'my-auth-token'
    request.headers['X-Request-ID'] = str(uuid.uuid4())

# Register for all S3 operations
events.register('before-sign.s3.*', custom_auth_handler)

# Register for specific operation only
events.register('before-sign.s3.GetObject', custom_auth_handler)

Request/Response Logging

import json
import logging

logger = logging.getLogger(__name__)

def log_request_params(params, **kwargs):
    """Log request parameters before API call."""
    operation = kwargs.get('event_name', '').split('.')[-1]
    logger.info(f"Calling {operation} with params: {json.dumps(params, default=str)}")

def log_response_data(parsed, **kwargs):
    """Log response data after API call."""
    operation = kwargs.get('event_name', '').split('.')[-1]
    logger.info(f"Response from {operation}: {json.dumps(parsed, default=str)}")

# Register logging handlers
events.register('before-parameter-build.*.*', log_request_params)
events.register('after-call.*.*', log_response_data)

Performance Monitoring

import time

class PerformanceMonitor:
    def __init__(self):
        self.timings = {}
    
    def start_timer(self, **kwargs):
        """Record request start time."""
        event_name = kwargs.get('event_name')
        self.timings[event_name] = time.time()
    
    def end_timer(self, **kwargs):
        """Calculate and log request duration."""
        event_name = kwargs.get('event_name')
        if event_name in self.timings:
            duration = time.time() - self.timings[event_name]
            print(f"Request {event_name} took {duration:.3f} seconds")
            del self.timings[event_name]

monitor = PerformanceMonitor()

# Register monitoring handlers
events.register('before-call.*.*', monitor.start_timer)
events.register('after-call.*.*', monitor.end_timer)

Response Modification

def modify_s3_response(parsed, **kwargs):
    """Add custom metadata to S3 responses."""
    if 'ResponseMetadata' in parsed:
        parsed['ResponseMetadata']['CustomProcessed'] = True
        parsed['ResponseMetadata']['ProcessedAt'] = time.time()

events.register('after-call.s3.*', modify_s3_response)

Error Handling and Retry Logic

from botocore.exceptions import ClientError

def custom_retry_handler(response, operation, **kwargs):
    """Implement custom retry logic."""
    if response and response.get('Error', {}).get('Code') == 'SlowDown':
        # Implement custom backoff for S3 SlowDown errors
        return {
            'retry': True,
            'retry_delay': 5.0  # 5 second delay
        }
    return None

def error_notification_handler(parsed, **kwargs):
    """Send notifications for specific errors."""
    if 'Error' in parsed:
        error_code = parsed['Error']['Code']
        if error_code in ['AccessDenied', 'InvalidAccessKeyId']:
            # Send alert for authentication issues
            print(f"Authentication error detected: {error_code}")

events.register('needs-retry.s3.*', custom_retry_handler)
events.register('after-call.*.*', error_notification_handler)

Session-Level Event Handling

def global_request_logger(**kwargs):
    """Log all AWS API requests across all services."""
    event_parts = kwargs.get('event_name', '').split('.')
    if len(event_parts) >= 3:
        service = event_parts[1]
        operation = event_parts[2]
        print(f"AWS API Call: {service}.{operation}")

# Register at session level for all clients
session_events = session.get_component('event_emitter')
session_events.register('before-call.*.*', global_request_logger)

Event Handler Priorities

def first_handler(**kwargs):
    print("Called first")

def middle_handler(**kwargs):
    print("Called middle")

def last_handler(**kwargs):
    print("Called last")

# Register handlers with different priorities
events.register_first('before-call.s3.ListBuckets', first_handler)
events.register('before-call.s3.ListBuckets', middle_handler)
events.register_last('before-call.s3.ListBuckets', last_handler)

# When ListBuckets is called, output will be:
# Called first
# Called middle  
# Called last

Conditional Event Processing

def conditional_handler(request, **kwargs):
    """Only process requests to specific buckets."""
    params = kwargs.get('params', {})
    bucket_name = params.get('Bucket', '')
    
    if bucket_name.startswith('sensitive-'):
        # Add extra security headers for sensitive buckets
        request.headers['X-Extra-Security'] = 'enabled'
        print(f"Enhanced security applied to {bucket_name}")

events.register('before-sign.s3.*', conditional_handler)

Unique Event Handler Registration

def singleton_handler(**kwargs):
    """Handler that should only be registered once."""
    print("Singleton handler called")

# Register with unique ID to prevent duplicates
events.register(
    'before-call.s3.*', 
    singleton_handler,
    unique_id='singleton-handler'
)

# Subsequent registrations with same unique_id are ignored
events.register(
    'before-call.s3.*', 
    singleton_handler,
    unique_id='singleton-handler'  # This will be ignored
)

Event Unregistration

def temporary_handler(**kwargs):
    print("Temporary handler")

# Register handler
events.register('before-call.s3.ListBuckets', temporary_handler)

# Use it for some operations
client.list_buckets()  # Handler called

# Unregister when no longer needed
events.unregister('before-call.s3.ListBuckets', temporary_handler)

# Handler no longer called
client.list_buckets()  # Handler not called

Integration Patterns

Custom Client Configuration

from botocore.client import BaseClient
from botocore.config import Config

def setup_enhanced_s3_client():
    """Create S3 client with enhanced event handling."""
    
    # Create client with custom configuration
    config = Config(
        retries={'max_attempts': 3},
        read_timeout=30
    )
    
    client = session.create_client('s3', config=config)
    events = client.meta.events
    
    # Add custom event handlers
    events.register('before-call.s3.*', add_request_metadata)
    events.register('after-call.s3.*', log_response_metadata) 
    events.register('needs-retry.s3.*', custom_retry_strategy)
    
    return client

def add_request_metadata(request, **kwargs):
    """Add metadata to all S3 requests."""
    request.headers['X-Client-Version'] = '1.0'
    request.headers['X-Request-Source'] = 'enhanced-client'

def log_response_metadata(parsed, **kwargs):
    """Log S3 response metadata."""
    if 'ResponseMetadata' in parsed:
        request_id = parsed['ResponseMetadata'].get('RequestId')
        print(f"S3 Request ID: {request_id}")

def custom_retry_strategy(response, **kwargs):
    """Implement enhanced retry strategy."""
    if response and 'Error' in response:
        error_code = response['Error']['Code']
        if error_code == 'ServiceUnavailable':
            return {'retry': True, 'retry_delay': 2.0}
    return None

# Use enhanced client
s3_client = setup_enhanced_s3_client()

Multi-Service Event Coordination

class AWSOperationTracker:
    """Track operations across multiple AWS services."""
    
    def __init__(self):
        self.active_operations = {}
        self.completed_operations = []
    
    def start_operation(self, **kwargs):
        """Track when an operation starts."""
        event_name = kwargs.get('event_name', '')
        operation_id = f"{event_name}-{time.time()}"
        
        self.active_operations[operation_id] = {
            'event': event_name,
            'start_time': time.time(),
            'params': kwargs.get('params', {})
        }
        
        print(f"Started operation: {operation_id}")
    
    def complete_operation(self, **kwargs):
        """Track when an operation completes."""
        event_name = kwargs.get('event_name', '')
        
        # Find matching active operation
        for op_id, op_data in list(self.active_operations.items()):
            if op_data['event'] == event_name:
                duration = time.time() - op_data['start_time']
                
                self.completed_operations.append({
                    'id': op_id,
                    'event': event_name,
                    'duration': duration
                })
                
                del self.active_operations[op_id]
                print(f"Completed operation: {op_id} ({duration:.3f}s)")
                break

# Create tracker and register across multiple clients
tracker = AWSOperationTracker()

# Register for multiple services
for service in ['s3', 'ec2', 'dynamodb']:
    client = session.create_client(service)
    events = client.meta.events
    
    events.register(f'before-call.{service}.*', tracker.start_operation)
    events.register(f'after-call.{service}.*', tracker.complete_operation)

Best Practices

Event Handler Design

# Good: Handler accepts **kwargs and handles missing parameters gracefully
def robust_handler(**kwargs):
    event_name = kwargs.get('event_name', 'unknown')
    params = kwargs.get('params', {})
    
    # Process event safely
    if 'Bucket' in params:
        print(f"Processing bucket: {params['Bucket']}")

# Bad: Handler has specific parameter signature
def fragile_handler(event_name, params):  # Will fail if parameters change
    print(f"Event: {event_name}, Bucket: {params['Bucket']}")

# Good: Register with appropriate specificity
events.register('before-call.s3.GetObject', specific_handler)  # Specific operation
events.register('before-call.s3.*', service_handler)          # All S3 operations
events.register('before-call.*.*', global_handler)           # All operations

# Good: Use unique IDs for singleton handlers
events.register(
    'before-call.s3.*',
    auth_handler,
    unique_id='custom-auth'  # Prevents duplicate registration
)

Error Handling in Event Handlers

def safe_event_handler(**kwargs):
    """Event handler with proper error handling."""
    try:
        # Handler logic here
        request = kwargs.get('request')
        if request:
            request.headers['X-Custom-Header'] = 'value'
    except Exception as e:
        # Log error but don't raise to avoid breaking the request
        logger.error(f"Event handler error: {e}")
        # Optionally re-raise for critical handlers
        # raise

def critical_event_handler(**kwargs):
    """Handler where errors should stop request processing."""
    try:
        # Critical validation or security logic
        validate_request_security(kwargs.get('request'))
    except SecurityError:
        # Re-raise to stop request processing
        raise
    except Exception as e:
        logger.error(f"Critical handler error: {e}")
        raise  # Re-raise all errors for critical handlers

Performance Considerations

# Good: Efficient event handler
def efficient_handler(**kwargs):
    # Quick checks first
    if 'request' not in kwargs:
        return
    
    request = kwargs['request']
    
    # Avoid expensive operations in handlers
    if should_process_request(request):
        process_request_efficiently(request)

# Good: Conditional registration
if enable_detailed_logging:
    events.register('before-call.*.*', detailed_logging_handler)
else:
    events.register('before-call.*.*', basic_logging_handler)

# Good: Unregister when no longer needed
def temporary_debugging():
    def debug_handler(**kwargs):
        print(f"Debug: {kwargs}")
    
    # Register temporarily
    events.register('before-call.s3.*', debug_handler)
    
    try:
        # Perform operations with debugging
        client.list_buckets()
    finally:
        # Clean up
        events.unregister('before-call.s3.*', debug_handler)

The event system provides powerful hooks for customizing AWS API interactions while maintaining clean separation of concerns and extensible architecture patterns.

Install with Tessl CLI

npx tessl i tessl/pypi-botocore

docs

client.md

config.md

credentials.md

events.md

exceptions.md

index.md

models.md

pagination.md

response.md

session.md

testing.md

waiters.md

tile.json