Low-level, data-driven core of boto 3 providing foundational AWS service access.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Extensible event system for hooking into request/response lifecycle, enabling custom authentication, logging, monitoring, and request modification. The event system provides a hierarchical emitter that supports wildcard matching and event aliasing for maximum flexibility in customizing AWS API interactions.
Core event emission system with hierarchical event names and wildcard support.
class HierarchicalEmitter:
def __init__(self):
"""
Initialize hierarchical event emitter.
Provides event registration, emission, and handler management
with support for hierarchical event names and wildcard matching.
"""
def emit(self, event_name: str, **kwargs) -> List[Tuple[callable, Any]]:
"""
Emit an event by name with arguments passed as keyword args.
Args:
event_name: Dot-separated event name (e.g., 'before-call.s3.GetObject')
**kwargs: Arguments to pass to event handlers
Returns:
List of (handler, response) tuples from all processed handlers
Example:
>>> responses = emitter.emit(
... 'my-event.service.operation', arg1='one', arg2='two')
"""
def emit_until_response(
self,
event_name: str,
**kwargs
) -> Tuple[callable, Any]:
"""
Emit event until first non-None response is received.
Prevents subsequent handlers from being invoked after receiving
a non-None response, useful for short-circuiting event processing.
Args:
event_name: Dot-separated event name
**kwargs: Arguments to pass to event handlers
Returns:
First (handler, response) tuple with non-None response,
otherwise (None, None)
Example:
>>> handler, response = emitter.emit_until_response(
... 'my-event.service.operation', arg1='one', arg2='two')
"""
def register(
self,
event_name: str,
handler: callable,
unique_id: str = None,
unique_id_uses_count: bool = False
) -> None:
"""
Register an event handler for a given event.
Handlers are called in order: register_first() → register() → register_last()
Args:
event_name: Event name to register handler for
handler: Callable event handler that accepts **kwargs
unique_id: Unique identifier to prevent duplicate registrations
unique_id_uses_count: Whether unique_id uses reference counting
Raises:
ValueError: If handler is not callable or doesn't accept **kwargs
"""
def register_first(
self,
event_name: str,
handler: callable,
unique_id: str = None,
unique_id_uses_count: bool = False
) -> None:
"""
Register event handler to be called first for an event.
All handlers registered with register_first() are called before
handlers registered with register() and register_last().
Args:
event_name: Event name to register handler for
handler: Callable event handler that accepts **kwargs
unique_id: Unique identifier to prevent duplicate registrations
unique_id_uses_count: Whether unique_id uses reference counting
"""
def register_last(
self,
event_name: str,
handler: callable,
unique_id: str = None,
unique_id_uses_count: bool = False
) -> None:
"""
Register event handler to be called last for an event.
All handlers registered with register_last() are called after
handlers registered with register_first() and register().
Args:
event_name: Event name to register handler for
handler: Callable event handler that accepts **kwargs
unique_id: Unique identifier to prevent duplicate registrations
unique_id_uses_count: Whether unique_id uses reference counting
"""
def unregister(
self,
event_name: str,
handler: callable = None,
unique_id: str = None,
unique_id_uses_count: bool = False
) -> None:
"""
Unregister an event handler for a given event.
Args:
event_name: Event name to unregister handler from
handler: Handler to unregister (if no unique_id specified)
unique_id: Unique identifier of handler to unregister
unique_id_uses_count: Whether unique_id uses reference counting
"""Event name aliasing system for backward compatibility and event name transformation.
class EventAliaser:
def __init__(
self,
event_emitter: HierarchicalEmitter,
event_aliases: dict = None
):
"""
Initialize event aliaser with underlying emitter and alias mappings.
Args:
event_emitter: Underlying hierarchical emitter
event_aliases: Mapping of old event names to new event names
"""
def emit(self, event_name: str, **kwargs) -> List[Tuple[callable, Any]]:
"""Emit event with automatic name aliasing."""
def emit_until_response(
self,
event_name: str,
**kwargs
) -> Tuple[callable, Any]:
"""Emit event until response with automatic name aliasing."""
def register(
self,
event_name: str,
handler: callable,
unique_id: str = None,
unique_id_uses_count: bool = False
) -> None:
"""Register handler with automatic event name aliasing."""
def unregister(
self,
event_name: str,
handler: callable = None,
unique_id: str = None,
unique_id_uses_count: bool = False
) -> None:
"""Unregister handler with automatic event name aliasing."""Helper functions for working with event responses.
def first_non_none_response(
responses: List[Tuple[callable, Any]],
default: Any = None
) -> Any:
"""
Find first non-None response in a list of handler response tuples.
Args:
responses: List of (handler, response) tuples from emit()
default: Default value if no non-None responses found
Returns:
First non-None response or default value
Example:
>>> responses = [(func1, None), (func2, 'foo'), (func3, 'bar')]
>>> first_non_none_response(responses)
'foo'
"""The botocore event system provides hooks throughout the AWS API request lifecycle:
Event Flow Order:
provide-client-params.*.* - Provide additional client parametersbefore-parameter-build.*.* - Modify parameters before request buildingbefore-call.*.* - Modify request before sendingbefore-sign.*.* - Modify request before signingresponse-received.*.* - Process response after receivingafter-call.*.* - Process final responseneeds-retry.*.* - Determine if request should be retriedParameter Building Events:
before-parameter-build.{service}.{operation} - Modify operation parametersprovide-client-params.{service}.{operation} - Add client-level parametersRequest Processing Events:
before-call.{service}.{operation} - Modify request before sendingbefore-sign.{service}.{operation} - Modify request before signingResponse Processing Events:
response-received.{service}.{operation} - Process HTTP responseafter-call.{service}.{operation} - Process parsed responseRetry Events:
needs-retry.{service}.{operation} - Determine retry behaviorGlobal Events:
before-call.*.* - Called for all service operationsafter-call.*.* - Called for all service operationsneeds-retry.*.* - Called for all retry decisionsfrom botocore.session import get_session
# Create session and client
session = get_session()
client = session.create_client('s3', region_name='us-east-1')
# Access the event system
events = client.meta.events
# Register a simple event handler
def log_request(**kwargs):
print(f"Making request: {kwargs.get('event_name')}")
events.register('before-call.s3.*', log_request)
# Make a request - handler will be called
response = client.list_buckets()def custom_auth_handler(request, **kwargs):
"""Add custom authentication headers to requests."""
request.headers['X-Custom-Auth'] = 'my-auth-token'
request.headers['X-Request-ID'] = str(uuid.uuid4())
# Register for all S3 operations
events.register('before-sign.s3.*', custom_auth_handler)
# Register for specific operation only
events.register('before-sign.s3.GetObject', custom_auth_handler)import json
import logging
logger = logging.getLogger(__name__)
def log_request_params(params, **kwargs):
"""Log request parameters before API call."""
operation = kwargs.get('event_name', '').split('.')[-1]
logger.info(f"Calling {operation} with params: {json.dumps(params, default=str)}")
def log_response_data(parsed, **kwargs):
"""Log response data after API call."""
operation = kwargs.get('event_name', '').split('.')[-1]
logger.info(f"Response from {operation}: {json.dumps(parsed, default=str)}")
# Register logging handlers
events.register('before-parameter-build.*.*', log_request_params)
events.register('after-call.*.*', log_response_data)import time
class PerformanceMonitor:
def __init__(self):
self.timings = {}
def start_timer(self, **kwargs):
"""Record request start time."""
event_name = kwargs.get('event_name')
self.timings[event_name] = time.time()
def end_timer(self, **kwargs):
"""Calculate and log request duration."""
event_name = kwargs.get('event_name')
if event_name in self.timings:
duration = time.time() - self.timings[event_name]
print(f"Request {event_name} took {duration:.3f} seconds")
del self.timings[event_name]
monitor = PerformanceMonitor()
# Register monitoring handlers
events.register('before-call.*.*', monitor.start_timer)
events.register('after-call.*.*', monitor.end_timer)def modify_s3_response(parsed, **kwargs):
"""Add custom metadata to S3 responses."""
if 'ResponseMetadata' in parsed:
parsed['ResponseMetadata']['CustomProcessed'] = True
parsed['ResponseMetadata']['ProcessedAt'] = time.time()
events.register('after-call.s3.*', modify_s3_response)from botocore.exceptions import ClientError
def custom_retry_handler(response, operation, **kwargs):
"""Implement custom retry logic."""
if response and response.get('Error', {}).get('Code') == 'SlowDown':
# Implement custom backoff for S3 SlowDown errors
return {
'retry': True,
'retry_delay': 5.0 # 5 second delay
}
return None
def error_notification_handler(parsed, **kwargs):
"""Send notifications for specific errors."""
if 'Error' in parsed:
error_code = parsed['Error']['Code']
if error_code in ['AccessDenied', 'InvalidAccessKeyId']:
# Send alert for authentication issues
print(f"Authentication error detected: {error_code}")
events.register('needs-retry.s3.*', custom_retry_handler)
events.register('after-call.*.*', error_notification_handler)def global_request_logger(**kwargs):
"""Log all AWS API requests across all services."""
event_parts = kwargs.get('event_name', '').split('.')
if len(event_parts) >= 3:
service = event_parts[1]
operation = event_parts[2]
print(f"AWS API Call: {service}.{operation}")
# Register at session level for all clients
session_events = session.get_component('event_emitter')
session_events.register('before-call.*.*', global_request_logger)def first_handler(**kwargs):
print("Called first")
def middle_handler(**kwargs):
print("Called middle")
def last_handler(**kwargs):
print("Called last")
# Register handlers with different priorities
events.register_first('before-call.s3.ListBuckets', first_handler)
events.register('before-call.s3.ListBuckets', middle_handler)
events.register_last('before-call.s3.ListBuckets', last_handler)
# When ListBuckets is called, output will be:
# Called first
# Called middle
# Called lastdef conditional_handler(request, **kwargs):
"""Only process requests to specific buckets."""
params = kwargs.get('params', {})
bucket_name = params.get('Bucket', '')
if bucket_name.startswith('sensitive-'):
# Add extra security headers for sensitive buckets
request.headers['X-Extra-Security'] = 'enabled'
print(f"Enhanced security applied to {bucket_name}")
events.register('before-sign.s3.*', conditional_handler)def singleton_handler(**kwargs):
"""Handler that should only be registered once."""
print("Singleton handler called")
# Register with unique ID to prevent duplicates
events.register(
'before-call.s3.*',
singleton_handler,
unique_id='singleton-handler'
)
# Subsequent registrations with same unique_id are ignored
events.register(
'before-call.s3.*',
singleton_handler,
unique_id='singleton-handler' # This will be ignored
)def temporary_handler(**kwargs):
print("Temporary handler")
# Register handler
events.register('before-call.s3.ListBuckets', temporary_handler)
# Use it for some operations
client.list_buckets() # Handler called
# Unregister when no longer needed
events.unregister('before-call.s3.ListBuckets', temporary_handler)
# Handler no longer called
client.list_buckets() # Handler not calledfrom botocore.client import BaseClient
from botocore.config import Config
def setup_enhanced_s3_client():
"""Create S3 client with enhanced event handling."""
# Create client with custom configuration
config = Config(
retries={'max_attempts': 3},
read_timeout=30
)
client = session.create_client('s3', config=config)
events = client.meta.events
# Add custom event handlers
events.register('before-call.s3.*', add_request_metadata)
events.register('after-call.s3.*', log_response_metadata)
events.register('needs-retry.s3.*', custom_retry_strategy)
return client
def add_request_metadata(request, **kwargs):
"""Add metadata to all S3 requests."""
request.headers['X-Client-Version'] = '1.0'
request.headers['X-Request-Source'] = 'enhanced-client'
def log_response_metadata(parsed, **kwargs):
"""Log S3 response metadata."""
if 'ResponseMetadata' in parsed:
request_id = parsed['ResponseMetadata'].get('RequestId')
print(f"S3 Request ID: {request_id}")
def custom_retry_strategy(response, **kwargs):
"""Implement enhanced retry strategy."""
if response and 'Error' in response:
error_code = response['Error']['Code']
if error_code == 'ServiceUnavailable':
return {'retry': True, 'retry_delay': 2.0}
return None
# Use enhanced client
s3_client = setup_enhanced_s3_client()class AWSOperationTracker:
"""Track operations across multiple AWS services."""
def __init__(self):
self.active_operations = {}
self.completed_operations = []
def start_operation(self, **kwargs):
"""Track when an operation starts."""
event_name = kwargs.get('event_name', '')
operation_id = f"{event_name}-{time.time()}"
self.active_operations[operation_id] = {
'event': event_name,
'start_time': time.time(),
'params': kwargs.get('params', {})
}
print(f"Started operation: {operation_id}")
def complete_operation(self, **kwargs):
"""Track when an operation completes."""
event_name = kwargs.get('event_name', '')
# Find matching active operation
for op_id, op_data in list(self.active_operations.items()):
if op_data['event'] == event_name:
duration = time.time() - op_data['start_time']
self.completed_operations.append({
'id': op_id,
'event': event_name,
'duration': duration
})
del self.active_operations[op_id]
print(f"Completed operation: {op_id} ({duration:.3f}s)")
break
# Create tracker and register across multiple clients
tracker = AWSOperationTracker()
# Register for multiple services
for service in ['s3', 'ec2', 'dynamodb']:
client = session.create_client(service)
events = client.meta.events
events.register(f'before-call.{service}.*', tracker.start_operation)
events.register(f'after-call.{service}.*', tracker.complete_operation)# Good: Handler accepts **kwargs and handles missing parameters gracefully
def robust_handler(**kwargs):
event_name = kwargs.get('event_name', 'unknown')
params = kwargs.get('params', {})
# Process event safely
if 'Bucket' in params:
print(f"Processing bucket: {params['Bucket']}")
# Bad: Handler has specific parameter signature
def fragile_handler(event_name, params): # Will fail if parameters change
print(f"Event: {event_name}, Bucket: {params['Bucket']}")
# Good: Register with appropriate specificity
events.register('before-call.s3.GetObject', specific_handler) # Specific operation
events.register('before-call.s3.*', service_handler) # All S3 operations
events.register('before-call.*.*', global_handler) # All operations
# Good: Use unique IDs for singleton handlers
events.register(
'before-call.s3.*',
auth_handler,
unique_id='custom-auth' # Prevents duplicate registration
)def safe_event_handler(**kwargs):
"""Event handler with proper error handling."""
try:
# Handler logic here
request = kwargs.get('request')
if request:
request.headers['X-Custom-Header'] = 'value'
except Exception as e:
# Log error but don't raise to avoid breaking the request
logger.error(f"Event handler error: {e}")
# Optionally re-raise for critical handlers
# raise
def critical_event_handler(**kwargs):
"""Handler where errors should stop request processing."""
try:
# Critical validation or security logic
validate_request_security(kwargs.get('request'))
except SecurityError:
# Re-raise to stop request processing
raise
except Exception as e:
logger.error(f"Critical handler error: {e}")
raise # Re-raise all errors for critical handlers# Good: Efficient event handler
def efficient_handler(**kwargs):
# Quick checks first
if 'request' not in kwargs:
return
request = kwargs['request']
# Avoid expensive operations in handlers
if should_process_request(request):
process_request_efficiently(request)
# Good: Conditional registration
if enable_detailed_logging:
events.register('before-call.*.*', detailed_logging_handler)
else:
events.register('before-call.*.*', basic_logging_handler)
# Good: Unregister when no longer needed
def temporary_debugging():
def debug_handler(**kwargs):
print(f"Debug: {kwargs}")
# Register temporarily
events.register('before-call.s3.*', debug_handler)
try:
# Perform operations with debugging
client.list_buckets()
finally:
# Clean up
events.unregister('before-call.s3.*', debug_handler)The event system provides powerful hooks for customizing AWS API interactions while maintaining clean separation of concerns and extensible architecture patterns.
Install with Tessl CLI
npx tessl i tessl/pypi-botocore