CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-s3transfer

An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.

Pending
Overview
Eval results
Files

subscribers-callbacks.mddocs/

Event Subscribers and Callbacks

Extensible subscriber system for handling transfer events including progress updates, completion notifications, error handling, and custom event processing throughout the transfer lifecycle.

Capabilities

BaseSubscriber

Base class for implementing transfer event subscribers with standardized callback methods for different transfer lifecycle events.

class BaseSubscriber:
    """
    Base class for transfer event subscribers.
    
    Provides callback methods that are called during different phases of transfer operations.
    Subclass this to implement custom event handling.
    """
    def on_queued(self, **kwargs):
        """
        Called when transfer is queued for execution.
        
        Args:
            **kwargs: Additional event information including:
                - transfer_id: Unique transfer identifier
                - call_args: Original call arguments
                - user_context: User-defined context
        """
    
    def on_progress(self, bytes_transferred: int, **kwargs):
        """
        Called when transfer progress is made.
        
        Args:
            bytes_transferred (int): Number of bytes transferred in this progress event
            **kwargs: Additional event information including:
                - total_bytes_transferred: Total bytes transferred so far
                - transfer_size: Total transfer size (if known)
                - transfer_id: Unique transfer identifier
        """
    
    def on_done(self, **kwargs):
        """
        Called when transfer completes (successfully or with error).
        
        Args:
            **kwargs: Additional event information including:
                - transfer_id: Unique transfer identifier
                - exception: Exception if transfer failed (None if successful)
                - result: Transfer result
        """
    
    # Class constant defining valid subscriber callback types
    VALID_SUBSCRIBER_TYPES: List[str]

Usage Examples

Basic Progress Subscriber

from s3transfer.subscribers import BaseSubscriber
from s3transfer.manager import TransferManager
import time

class ProgressSubscriber(BaseSubscriber):
    """Simple progress tracking subscriber."""
    
    def __init__(self, description="Transfer"):
        self.description = description
        self.start_time = None
        self.total_transferred = 0
        self.last_update = None
    
    def on_queued(self, **kwargs):
        self.start_time = time.time()
        print(f"{self.description}: Queued (ID: {kwargs.get('transfer_id', 'unknown')})")
    
    def on_progress(self, bytes_transferred, **kwargs):
        self.total_transferred += bytes_transferred
        current_time = time.time()
        
        # Update progress every second to avoid spam
        if self.last_update is None or current_time - self.last_update >= 1.0:
            if self.start_time:
                elapsed = current_time - self.start_time
                rate = self.total_transferred / elapsed if elapsed > 0 else 0
                
                transfer_size = kwargs.get('transfer_size')
                if transfer_size:
                    percentage = (self.total_transferred / transfer_size) * 100
                    print(f"{self.description}: {percentage:.1f}% ({self.total_transferred}/{transfer_size} bytes) at {rate/1024:.1f} KB/s")
                else:
                    print(f"{self.description}: {self.total_transferred} bytes at {rate/1024:.1f} KB/s")
            
            self.last_update = current_time
    
    def on_done(self, **kwargs):
        if self.start_time:
            elapsed = time.time() - self.start_time
            avg_rate = self.total_transferred / elapsed if elapsed > 0 else 0
            
            exception = kwargs.get('exception')
            if exception:
                print(f"{self.description}: Failed after {elapsed:.2f}s - {exception}")
            else:
                print(f"{self.description}: Completed in {elapsed:.2f}s (avg: {avg_rate/1024:.1f} KB/s)")

# Use the progress subscriber
import boto3

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    progress_sub = ProgressSubscriber("Upload large file")
    
    with open('/tmp/large_file.dat', 'rb') as f:
        future = transfer_manager.upload(
            f, 'my-bucket', 'large_file.dat',
            subscribers=[progress_sub]
        )
        
        # Provide size for accurate progress percentage
        file_size = os.path.getsize('/tmp/large_file.dat')
        future.meta.provide_transfer_size(file_size)
        
        result = future.result()

finally:
    transfer_manager.shutdown()

Comprehensive Event Logger

import logging
import json
from datetime import datetime

class TransferEventLogger(BaseSubscriber):
    """Comprehensive event logger for transfer operations."""
    
    def __init__(self, logger_name="s3transfer"):
        self.logger = logging.getLogger(logger_name)
        self.transfer_stats = {}
    
    def on_queued(self, **kwargs):
        transfer_id = kwargs.get('transfer_id', 'unknown')
        call_args = kwargs.get('call_args', {})
        
        self.transfer_stats[transfer_id] = {
            'queued_time': datetime.now().isoformat(),
            'total_bytes': 0,
            'progress_events': 0,
            'bucket': getattr(call_args, 'bucket', 'unknown'),
            'key': getattr(call_args, 'key', 'unknown')
        }
        
        self.logger.info(f"Transfer queued: {transfer_id}", extra={
            'transfer_id': transfer_id,
            'bucket': self.transfer_stats[transfer_id]['bucket'],
            'key': self.transfer_stats[transfer_id]['key'],
            'event': 'queued'
        })
    
    def on_progress(self, bytes_transferred, **kwargs):
        transfer_id = kwargs.get('transfer_id', 'unknown')
        
        if transfer_id in self.transfer_stats:
            stats = self.transfer_stats[transfer_id]
            stats['total_bytes'] += bytes_transferred
            stats['progress_events'] += 1
            stats['last_progress'] = datetime.now().isoformat()
            
            # Log significant progress milestones
            if stats['progress_events'] % 100 == 0:  # Every 100th progress event
                self.logger.debug(f"Progress milestone: {transfer_id}", extra={
                    'transfer_id': transfer_id,
                    'bytes_transferred': bytes_transferred,
                    'total_bytes': stats['total_bytes'],
                    'progress_events': stats['progress_events'],
                    'event': 'progress_milestone'
                })
    
    def on_done(self, **kwargs):
        transfer_id = kwargs.get('transfer_id', 'unknown')
        exception = kwargs.get('exception')
        
        if transfer_id in self.transfer_stats:
            stats = self.transfer_stats[transfer_id]
            stats['completed_time'] = datetime.now().isoformat()
            stats['success'] = exception is None
            
            if exception:
                stats['error'] = str(exception)
                self.logger.error(f"Transfer failed: {transfer_id}", extra={
                    'transfer_id': transfer_id,
                    'error': str(exception),
                    'total_bytes': stats['total_bytes'],
                    'event': 'failed'
                })
            else:
                self.logger.info(f"Transfer completed: {transfer_id}", extra={
                    'transfer_id': transfer_id,
                    'total_bytes': stats['total_bytes'],
                    'progress_events': stats['progress_events'],
                    'event': 'completed'
                })
            
            # Clean up stats to prevent memory leaks
            del self.transfer_stats[transfer_id]

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Use the logger
client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    event_logger = TransferEventLogger()
    
    # Multiple transfers with comprehensive logging
    files_to_upload = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
    futures = []
    
    for filename in files_to_upload:
        with open(filename, 'rb') as f:
            future = transfer_manager.upload(
                f, 'my-bucket', os.path.basename(filename),
                subscribers=[event_logger]
            )
            futures.append(future)
    
    # Wait for all transfers
    for future in futures:
        try:
            future.result()
        except Exception as e:
            print(f"Transfer failed: {e}")

finally:
    transfer_manager.shutdown()

Custom Metrics Collector

import threading
from collections import defaultdict, deque
from datetime import datetime, timedelta

class MetricsCollector(BaseSubscriber):
    """Collects detailed metrics about transfer performance."""
    
    def __init__(self, window_seconds=300):  # 5-minute window
        self.window_seconds = window_seconds
        self.lock = threading.Lock()
        
        # Metrics storage
        self.transfer_metrics = defaultdict(dict)
        self.throughput_samples = deque()
        self.error_counts = defaultdict(int)
        self.completion_times = deque()
        
        # Running statistics
        self.total_transfers = 0
        self.successful_transfers = 0
        self.total_bytes_transferred = 0
    
    def on_queued(self, **kwargs):
        transfer_id = kwargs.get('transfer_id', 'unknown')
        
        with self.lock:
            self.transfer_metrics[transfer_id] = {
                'queued_time': datetime.now(),
                'bytes_transferred': 0,
                'progress_count': 0
            }
            self.total_transfers += 1
    
    def on_progress(self, bytes_transferred, **kwargs):
        transfer_id = kwargs.get('transfer_id', 'unknown')
        current_time = datetime.now()
        
        with self.lock:
            if transfer_id in self.transfer_metrics:
                metrics = self.transfer_metrics[transfer_id]
                metrics['bytes_transferred'] += bytes_transferred
                metrics['progress_count'] += 1
                metrics['last_progress'] = current_time
                
                # Add throughput sample
                self.throughput_samples.append((current_time, bytes_transferred))
                self._cleanup_old_samples(current_time)
    
    def on_done(self, **kwargs):
        transfer_id = kwargs.get('transfer_id', 'unknown')
        exception = kwargs.get('exception')
        current_time = datetime.now()
        
        with self.lock:
            if transfer_id in self.transfer_metrics:
                metrics = self.transfer_metrics[transfer_id]
                metrics['completed_time'] = current_time
                
                if exception:
                    error_type = type(exception).__name__
                    self.error_counts[error_type] += 1
                    metrics['error'] = str(exception)
                else:
                    self.successful_transfers += 1
                    self.total_bytes_transferred += metrics['bytes_transferred']
                    
                    # Calculate transfer duration
                    duration = current_time - metrics['queued_time']
                    self.completion_times.append((current_time, duration.total_seconds()))
                    
                    # Clean up old completion times
                    self._cleanup_old_completions(current_time)
                
                # Clean up transfer metrics
                del self.transfer_metrics[transfer_id]
    
    def _cleanup_old_samples(self, current_time):
        """Remove throughput samples older than window."""
        cutoff = current_time - timedelta(seconds=self.window_seconds)
        while self.throughput_samples and self.throughput_samples[0][0] < cutoff:
            self.throughput_samples.popleft()
    
    def _cleanup_old_completions(self, current_time):
        """Remove completion times older than window."""
        cutoff = current_time - timedelta(seconds=self.window_seconds)
        while self.completion_times and self.completion_times[0][0] < cutoff:
            self.completion_times.popleft()
    
    def get_current_throughput(self):
        """Get current throughput in bytes per second."""
        with self.lock:
            if not self.throughput_samples:
                return 0.0
            
            total_bytes = sum(sample[1] for sample in self.throughput_samples)
            time_span = (self.throughput_samples[-1][0] - self.throughput_samples[0][0]).total_seconds()
            
            return total_bytes / time_span if time_span > 0 else 0.0
    
    def get_average_completion_time(self):
        """Get average completion time in seconds."""
        with self.lock:
            if not self.completion_times:
                return 0.0
            
            return sum(ct[1] for ct in self.completion_times) / len(self.completion_times)
    
    def get_success_rate(self):
        """Get success rate as percentage."""
        with self.lock:
            if self.total_transfers == 0:
                return 0.0
            return (self.successful_transfers / self.total_transfers) * 100
    
    def get_error_summary(self):
        """Get error counts by type."""
        with self.lock:
            return dict(self.error_counts)
    
    def print_metrics(self):
        """Print current metrics summary."""
        throughput = self.get_current_throughput()
        avg_completion = self.get_average_completion_time()
        success_rate = self.get_success_rate()
        errors = self.get_error_summary()
        
        print("\n=== Transfer Metrics ===")
        print(f"Total transfers: {self.total_transfers}")
        print(f"Successful transfers: {self.successful_transfers}")
        print(f"Success rate: {success_rate:.1f}%")
        print(f"Current throughput: {throughput / 1024:.1f} KB/s")
        print(f"Average completion time: {avg_completion:.2f} seconds")
        print(f"Total bytes transferred: {self.total_bytes_transferred / (1024*1024):.1f} MB")
        
        if errors:
            print("Error summary:")
            for error_type, count in errors.items():
                print(f"  {error_type}: {count}")
        print("========================\n")

# Use the metrics collector
import time

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    metrics = MetricsCollector()
    
    # Start multiple transfers with metrics collection
    futures = []
    for i in range(10):
        filename = f'/tmp/test_file_{i}.txt'
        # Create test file
        with open(filename, 'w') as f:
            f.write('x' * (1024 * (i + 1)))  # Files of increasing size
        
        with open(filename, 'rb') as f:
            future = transfer_manager.upload(
                f, 'my-bucket', f'test_file_{i}.txt',
                subscribers=[metrics]
            )
            futures.append(future)
    
    # Monitor progress and print metrics periodically
    completed = 0
    while completed < len(futures):
        time.sleep(2)
        
        # Check for completed transfers
        for future in futures:
            if future.done() and not hasattr(future, '_processed'):
                completed += 1
                future._processed = True
        
        # Print current metrics
        metrics.print_metrics()
    
    # Final metrics
    print("Final metrics:")
    metrics.print_metrics()

finally:
    transfer_manager.shutdown()

Conditional Event Handling

class ConditionalSubscriber(BaseSubscriber):
    """Subscriber that handles events based on specific conditions."""
    
    def __init__(self, conditions=None):
        self.conditions = conditions or {}
        self.matched_transfers = set()
    
    def _matches_conditions(self, **kwargs):
        """Check if event matches specified conditions."""
        if not self.conditions:
            return True
        
        call_args = kwargs.get('call_args')
        if not call_args:
            return False
        
        # Check bucket condition
        if 'bucket' in self.conditions:
            if getattr(call_args, 'bucket', None) != self.conditions['bucket']:
                return False
        
        # Check key pattern condition
        if 'key_pattern' in self.conditions:
            key = getattr(call_args, 'key', '')
            if self.conditions['key_pattern'] not in key:
                return False
        
        # Check minimum size condition
        if 'min_size' in self.conditions:
            transfer_size = kwargs.get('transfer_size', 0)
            if transfer_size < self.conditions['min_size']:
                return False
        
        return True
    
    def on_queued(self, **kwargs):
        if self._matches_conditions(**kwargs):
            transfer_id = kwargs.get('transfer_id')
            self.matched_transfers.add(transfer_id)
            print(f"Monitoring transfer: {transfer_id}")
    
    def on_progress(self, bytes_transferred, **kwargs):
        transfer_id = kwargs.get('transfer_id')
        if transfer_id in self.matched_transfers:
            total_transferred = kwargs.get('total_bytes_transferred', bytes_transferred)
            print(f"Progress for {transfer_id}: {total_transferred} bytes")
    
    def on_done(self, **kwargs):
        transfer_id = kwargs.get('transfer_id')
        if transfer_id in self.matched_transfers:
            exception = kwargs.get('exception')
            if exception:
                print(f"Monitored transfer failed: {transfer_id} - {exception}")
            else:
                print(f"Monitored transfer completed: {transfer_id}")
            
            self.matched_transfers.discard(transfer_id)

# Use conditional subscriber
conditions = {
    'bucket': 'important-bucket',
    'key_pattern': 'critical/',
    'min_size': 10 * 1024 * 1024  # Only files > 10MB
}

conditional_sub = ConditionalSubscriber(conditions)

# This transfer will be monitored (if it meets conditions)
with open('/tmp/critical_large_file.dat', 'rb') as f:
    future = transfer_manager.upload(
        f, 'important-bucket', 'critical/large_file.dat',
        subscribers=[conditional_sub]
    )
    future.result()

Multi-Subscriber Coordination

class SubscriberCoordinator:
    """Coordinates multiple subscribers for complex event handling."""
    
    def __init__(self, subscribers=None):
        self.subscribers = subscribers or []
        self.global_stats = {
            'total_queued': 0,
            'total_completed': 0,
            'total_failed': 0,
            'total_bytes': 0
        }
    
    def add_subscriber(self, subscriber):
        """Add a subscriber to the coordination."""
        self.subscribers.append(subscriber)
    
    def create_coordinated_subscriber(self):
        """Create a subscriber that coordinates with all registered subscribers."""
        
        class CoordinatedSubscriber(BaseSubscriber):
            def __init__(self, coordinator):
                self.coordinator = coordinator
            
            def on_queued(self, **kwargs):
                self.coordinator.global_stats['total_queued'] += 1
                for sub in self.coordinator.subscribers:
                    try:
                        sub.on_queued(**kwargs)
                    except Exception as e:
                        print(f"Subscriber error in on_queued: {e}")
            
            def on_progress(self, bytes_transferred, **kwargs):
                self.coordinator.global_stats['total_bytes'] += bytes_transferred
                for sub in self.coordinator.subscribers:
                    try:
                        sub.on_progress(bytes_transferred, **kwargs)
                    except Exception as e:
                        print(f"Subscriber error in on_progress: {e}")
            
            def on_done(self, **kwargs):
                exception = kwargs.get('exception')
                if exception:
                    self.coordinator.global_stats['total_failed'] += 1
                else:
                    self.coordinator.global_stats['total_completed'] += 1
                
                for sub in self.coordinator.subscribers:
                    try:
                        sub.on_done(**kwargs)
                    except Exception as e:
                        print(f"Subscriber error in on_done: {e}")
        
        return CoordinatedSubscriber(self)
    
    def print_global_stats(self):
        """Print global statistics across all subscribers."""
        stats = self.global_stats
        print(f"\nGlobal Stats:")
        print(f"  Queued: {stats['total_queued']}")
        print(f"  Completed: {stats['total_completed']}")
        print(f"  Failed: {stats['total_failed']}")
        print(f"  Total bytes: {stats['total_bytes'] / (1024*1024):.1f} MB")

# Use subscriber coordination
coordinator = SubscriberCoordinator()

# Add multiple subscribers
coordinator.add_subscriber(ProgressSubscriber("Global Progress"))
coordinator.add_subscriber(TransferEventLogger())
coordinator.add_subscriber(MetricsCollector())

# Create coordinated subscriber
coordinated_sub = coordinator.create_coordinated_subscriber()

# Use with transfers
with open('/tmp/test_file.dat', 'rb') as f:
    future = transfer_manager.upload(
        f, 'my-bucket', 'test_file.dat',
        subscribers=[coordinated_sub]
    )
    future.result()

coordinator.print_global_stats()

Event Information Reference

on_queued Event Parameters

  • transfer_id: Unique identifier for the transfer
  • call_args: Original method call arguments (bucket, key, etc.)
  • user_context: User-defined context dictionary

on_progress Event Parameters

  • bytes_transferred: Bytes transferred in this progress event
  • total_bytes_transferred: Total bytes transferred so far (optional)
  • transfer_size: Total transfer size if known (optional)
  • transfer_id: Unique identifier for the transfer

on_done Event Parameters

  • transfer_id: Unique identifier for the transfer
  • exception: Exception object if transfer failed (None if successful)
  • result: Transfer result object
  • call_args: Original method call arguments

Best Practices

Subscriber Implementation

  1. Handle exceptions: Wrap subscriber code in try/catch blocks
  2. Avoid blocking operations: Keep callback methods fast and non-blocking
  3. Use threading safely: Protect shared data with locks in multi-threaded environments
  4. Clean up resources: Remove references to completed transfers to prevent memory leaks

Performance Considerations

  1. Limit logging frequency: Avoid logging every progress event for large transfers
  2. Use efficient data structures: Choose appropriate containers for metrics storage
  3. Batch operations: Group multiple events for processing efficiency
  4. Monitor memory usage: Clean up old data periodically

Error Handling

  1. Graceful degradation: Subscriber failures shouldn't affect transfers
  2. Log subscriber errors: Report subscriber exceptions for debugging
  3. Validate event data: Check for required fields before processing
  4. Implement fallbacks: Provide default behavior when subscribers fail

Install with Tessl CLI

npx tessl i tessl/pypi-s3transfer

docs

bandwidth-management.md

configuration.md

crt-support.md

exception-handling.md

file-utilities.md

futures-coordination.md

index.md

legacy-transfer.md

process-pool-downloads.md

subscribers-callbacks.md

transfer-manager.md

tile.json