An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
—
Extensible subscriber system for handling transfer events including progress updates, completion notifications, error handling, and custom event processing throughout the transfer lifecycle.
Base class for implementing transfer event subscribers with standardized callback methods for different transfer lifecycle events.
class BaseSubscriber:
"""
Base class for transfer event subscribers.
Provides callback methods that are called during different phases of transfer operations.
Subclass this to implement custom event handling.
"""
def on_queued(self, **kwargs):
"""
Called when transfer is queued for execution.
Args:
**kwargs: Additional event information including:
- transfer_id: Unique transfer identifier
- call_args: Original call arguments
- user_context: User-defined context
"""
def on_progress(self, bytes_transferred: int, **kwargs):
"""
Called when transfer progress is made.
Args:
bytes_transferred (int): Number of bytes transferred in this progress event
**kwargs: Additional event information including:
- total_bytes_transferred: Total bytes transferred so far
- transfer_size: Total transfer size (if known)
- transfer_id: Unique transfer identifier
"""
def on_done(self, **kwargs):
"""
Called when transfer completes (successfully or with error).
Args:
**kwargs: Additional event information including:
- transfer_id: Unique transfer identifier
- exception: Exception if transfer failed (None if successful)
- result: Transfer result
"""
# Class constant defining valid subscriber callback types
VALID_SUBSCRIBER_TYPES: List[str]from s3transfer.subscribers import BaseSubscriber
from s3transfer.manager import TransferManager
import time
class ProgressSubscriber(BaseSubscriber):
"""Simple progress tracking subscriber."""
def __init__(self, description="Transfer"):
self.description = description
self.start_time = None
self.total_transferred = 0
self.last_update = None
def on_queued(self, **kwargs):
self.start_time = time.time()
print(f"{self.description}: Queued (ID: {kwargs.get('transfer_id', 'unknown')})")
def on_progress(self, bytes_transferred, **kwargs):
self.total_transferred += bytes_transferred
current_time = time.time()
# Update progress every second to avoid spam
if self.last_update is None or current_time - self.last_update >= 1.0:
if self.start_time:
elapsed = current_time - self.start_time
rate = self.total_transferred / elapsed if elapsed > 0 else 0
transfer_size = kwargs.get('transfer_size')
if transfer_size:
percentage = (self.total_transferred / transfer_size) * 100
print(f"{self.description}: {percentage:.1f}% ({self.total_transferred}/{transfer_size} bytes) at {rate/1024:.1f} KB/s")
else:
print(f"{self.description}: {self.total_transferred} bytes at {rate/1024:.1f} KB/s")
self.last_update = current_time
def on_done(self, **kwargs):
if self.start_time:
elapsed = time.time() - self.start_time
avg_rate = self.total_transferred / elapsed if elapsed > 0 else 0
exception = kwargs.get('exception')
if exception:
print(f"{self.description}: Failed after {elapsed:.2f}s - {exception}")
else:
print(f"{self.description}: Completed in {elapsed:.2f}s (avg: {avg_rate/1024:.1f} KB/s)")
# Use the progress subscriber
import boto3
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
progress_sub = ProgressSubscriber("Upload large file")
with open('/tmp/large_file.dat', 'rb') as f:
future = transfer_manager.upload(
f, 'my-bucket', 'large_file.dat',
subscribers=[progress_sub]
)
# Provide size for accurate progress percentage
file_size = os.path.getsize('/tmp/large_file.dat')
future.meta.provide_transfer_size(file_size)
result = future.result()
finally:
transfer_manager.shutdown()import logging
import json
from datetime import datetime
class TransferEventLogger(BaseSubscriber):
"""Comprehensive event logger for transfer operations."""
def __init__(self, logger_name="s3transfer"):
self.logger = logging.getLogger(logger_name)
self.transfer_stats = {}
def on_queued(self, **kwargs):
transfer_id = kwargs.get('transfer_id', 'unknown')
call_args = kwargs.get('call_args', {})
self.transfer_stats[transfer_id] = {
'queued_time': datetime.now().isoformat(),
'total_bytes': 0,
'progress_events': 0,
'bucket': getattr(call_args, 'bucket', 'unknown'),
'key': getattr(call_args, 'key', 'unknown')
}
self.logger.info(f"Transfer queued: {transfer_id}", extra={
'transfer_id': transfer_id,
'bucket': self.transfer_stats[transfer_id]['bucket'],
'key': self.transfer_stats[transfer_id]['key'],
'event': 'queued'
})
def on_progress(self, bytes_transferred, **kwargs):
transfer_id = kwargs.get('transfer_id', 'unknown')
if transfer_id in self.transfer_stats:
stats = self.transfer_stats[transfer_id]
stats['total_bytes'] += bytes_transferred
stats['progress_events'] += 1
stats['last_progress'] = datetime.now().isoformat()
# Log significant progress milestones
if stats['progress_events'] % 100 == 0: # Every 100th progress event
self.logger.debug(f"Progress milestone: {transfer_id}", extra={
'transfer_id': transfer_id,
'bytes_transferred': bytes_transferred,
'total_bytes': stats['total_bytes'],
'progress_events': stats['progress_events'],
'event': 'progress_milestone'
})
def on_done(self, **kwargs):
transfer_id = kwargs.get('transfer_id', 'unknown')
exception = kwargs.get('exception')
if transfer_id in self.transfer_stats:
stats = self.transfer_stats[transfer_id]
stats['completed_time'] = datetime.now().isoformat()
stats['success'] = exception is None
if exception:
stats['error'] = str(exception)
self.logger.error(f"Transfer failed: {transfer_id}", extra={
'transfer_id': transfer_id,
'error': str(exception),
'total_bytes': stats['total_bytes'],
'event': 'failed'
})
else:
self.logger.info(f"Transfer completed: {transfer_id}", extra={
'transfer_id': transfer_id,
'total_bytes': stats['total_bytes'],
'progress_events': stats['progress_events'],
'event': 'completed'
})
# Clean up stats to prevent memory leaks
del self.transfer_stats[transfer_id]
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Use the logger
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
event_logger = TransferEventLogger()
# Multiple transfers with comprehensive logging
files_to_upload = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
futures = []
for filename in files_to_upload:
with open(filename, 'rb') as f:
future = transfer_manager.upload(
f, 'my-bucket', os.path.basename(filename),
subscribers=[event_logger]
)
futures.append(future)
# Wait for all transfers
for future in futures:
try:
future.result()
except Exception as e:
print(f"Transfer failed: {e}")
finally:
transfer_manager.shutdown()import threading
from collections import defaultdict, deque
from datetime import datetime, timedelta
class MetricsCollector(BaseSubscriber):
"""Collects detailed metrics about transfer performance."""
def __init__(self, window_seconds=300): # 5-minute window
self.window_seconds = window_seconds
self.lock = threading.Lock()
# Metrics storage
self.transfer_metrics = defaultdict(dict)
self.throughput_samples = deque()
self.error_counts = defaultdict(int)
self.completion_times = deque()
# Running statistics
self.total_transfers = 0
self.successful_transfers = 0
self.total_bytes_transferred = 0
def on_queued(self, **kwargs):
transfer_id = kwargs.get('transfer_id', 'unknown')
with self.lock:
self.transfer_metrics[transfer_id] = {
'queued_time': datetime.now(),
'bytes_transferred': 0,
'progress_count': 0
}
self.total_transfers += 1
def on_progress(self, bytes_transferred, **kwargs):
transfer_id = kwargs.get('transfer_id', 'unknown')
current_time = datetime.now()
with self.lock:
if transfer_id in self.transfer_metrics:
metrics = self.transfer_metrics[transfer_id]
metrics['bytes_transferred'] += bytes_transferred
metrics['progress_count'] += 1
metrics['last_progress'] = current_time
# Add throughput sample
self.throughput_samples.append((current_time, bytes_transferred))
self._cleanup_old_samples(current_time)
def on_done(self, **kwargs):
transfer_id = kwargs.get('transfer_id', 'unknown')
exception = kwargs.get('exception')
current_time = datetime.now()
with self.lock:
if transfer_id in self.transfer_metrics:
metrics = self.transfer_metrics[transfer_id]
metrics['completed_time'] = current_time
if exception:
error_type = type(exception).__name__
self.error_counts[error_type] += 1
metrics['error'] = str(exception)
else:
self.successful_transfers += 1
self.total_bytes_transferred += metrics['bytes_transferred']
# Calculate transfer duration
duration = current_time - metrics['queued_time']
self.completion_times.append((current_time, duration.total_seconds()))
# Clean up old completion times
self._cleanup_old_completions(current_time)
# Clean up transfer metrics
del self.transfer_metrics[transfer_id]
def _cleanup_old_samples(self, current_time):
"""Remove throughput samples older than window."""
cutoff = current_time - timedelta(seconds=self.window_seconds)
while self.throughput_samples and self.throughput_samples[0][0] < cutoff:
self.throughput_samples.popleft()
def _cleanup_old_completions(self, current_time):
"""Remove completion times older than window."""
cutoff = current_time - timedelta(seconds=self.window_seconds)
while self.completion_times and self.completion_times[0][0] < cutoff:
self.completion_times.popleft()
def get_current_throughput(self):
"""Get current throughput in bytes per second."""
with self.lock:
if not self.throughput_samples:
return 0.0
total_bytes = sum(sample[1] for sample in self.throughput_samples)
time_span = (self.throughput_samples[-1][0] - self.throughput_samples[0][0]).total_seconds()
return total_bytes / time_span if time_span > 0 else 0.0
def get_average_completion_time(self):
"""Get average completion time in seconds."""
with self.lock:
if not self.completion_times:
return 0.0
return sum(ct[1] for ct in self.completion_times) / len(self.completion_times)
def get_success_rate(self):
"""Get success rate as percentage."""
with self.lock:
if self.total_transfers == 0:
return 0.0
return (self.successful_transfers / self.total_transfers) * 100
def get_error_summary(self):
"""Get error counts by type."""
with self.lock:
return dict(self.error_counts)
def print_metrics(self):
"""Print current metrics summary."""
throughput = self.get_current_throughput()
avg_completion = self.get_average_completion_time()
success_rate = self.get_success_rate()
errors = self.get_error_summary()
print("\n=== Transfer Metrics ===")
print(f"Total transfers: {self.total_transfers}")
print(f"Successful transfers: {self.successful_transfers}")
print(f"Success rate: {success_rate:.1f}%")
print(f"Current throughput: {throughput / 1024:.1f} KB/s")
print(f"Average completion time: {avg_completion:.2f} seconds")
print(f"Total bytes transferred: {self.total_bytes_transferred / (1024*1024):.1f} MB")
if errors:
print("Error summary:")
for error_type, count in errors.items():
print(f" {error_type}: {count}")
print("========================\n")
# Use the metrics collector
import time
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
metrics = MetricsCollector()
# Start multiple transfers with metrics collection
futures = []
for i in range(10):
filename = f'/tmp/test_file_{i}.txt'
# Create test file
with open(filename, 'w') as f:
f.write('x' * (1024 * (i + 1))) # Files of increasing size
with open(filename, 'rb') as f:
future = transfer_manager.upload(
f, 'my-bucket', f'test_file_{i}.txt',
subscribers=[metrics]
)
futures.append(future)
# Monitor progress and print metrics periodically
completed = 0
while completed < len(futures):
time.sleep(2)
# Check for completed transfers
for future in futures:
if future.done() and not hasattr(future, '_processed'):
completed += 1
future._processed = True
# Print current metrics
metrics.print_metrics()
# Final metrics
print("Final metrics:")
metrics.print_metrics()
finally:
transfer_manager.shutdown()class ConditionalSubscriber(BaseSubscriber):
"""Subscriber that handles events based on specific conditions."""
def __init__(self, conditions=None):
self.conditions = conditions or {}
self.matched_transfers = set()
def _matches_conditions(self, **kwargs):
"""Check if event matches specified conditions."""
if not self.conditions:
return True
call_args = kwargs.get('call_args')
if not call_args:
return False
# Check bucket condition
if 'bucket' in self.conditions:
if getattr(call_args, 'bucket', None) != self.conditions['bucket']:
return False
# Check key pattern condition
if 'key_pattern' in self.conditions:
key = getattr(call_args, 'key', '')
if self.conditions['key_pattern'] not in key:
return False
# Check minimum size condition
if 'min_size' in self.conditions:
transfer_size = kwargs.get('transfer_size', 0)
if transfer_size < self.conditions['min_size']:
return False
return True
def on_queued(self, **kwargs):
if self._matches_conditions(**kwargs):
transfer_id = kwargs.get('transfer_id')
self.matched_transfers.add(transfer_id)
print(f"Monitoring transfer: {transfer_id}")
def on_progress(self, bytes_transferred, **kwargs):
transfer_id = kwargs.get('transfer_id')
if transfer_id in self.matched_transfers:
total_transferred = kwargs.get('total_bytes_transferred', bytes_transferred)
print(f"Progress for {transfer_id}: {total_transferred} bytes")
def on_done(self, **kwargs):
transfer_id = kwargs.get('transfer_id')
if transfer_id in self.matched_transfers:
exception = kwargs.get('exception')
if exception:
print(f"Monitored transfer failed: {transfer_id} - {exception}")
else:
print(f"Monitored transfer completed: {transfer_id}")
self.matched_transfers.discard(transfer_id)
# Use conditional subscriber
conditions = {
'bucket': 'important-bucket',
'key_pattern': 'critical/',
'min_size': 10 * 1024 * 1024 # Only files > 10MB
}
conditional_sub = ConditionalSubscriber(conditions)
# This transfer will be monitored (if it meets conditions)
with open('/tmp/critical_large_file.dat', 'rb') as f:
future = transfer_manager.upload(
f, 'important-bucket', 'critical/large_file.dat',
subscribers=[conditional_sub]
)
future.result()class SubscriberCoordinator:
"""Coordinates multiple subscribers for complex event handling."""
def __init__(self, subscribers=None):
self.subscribers = subscribers or []
self.global_stats = {
'total_queued': 0,
'total_completed': 0,
'total_failed': 0,
'total_bytes': 0
}
def add_subscriber(self, subscriber):
"""Add a subscriber to the coordination."""
self.subscribers.append(subscriber)
def create_coordinated_subscriber(self):
"""Create a subscriber that coordinates with all registered subscribers."""
class CoordinatedSubscriber(BaseSubscriber):
def __init__(self, coordinator):
self.coordinator = coordinator
def on_queued(self, **kwargs):
self.coordinator.global_stats['total_queued'] += 1
for sub in self.coordinator.subscribers:
try:
sub.on_queued(**kwargs)
except Exception as e:
print(f"Subscriber error in on_queued: {e}")
def on_progress(self, bytes_transferred, **kwargs):
self.coordinator.global_stats['total_bytes'] += bytes_transferred
for sub in self.coordinator.subscribers:
try:
sub.on_progress(bytes_transferred, **kwargs)
except Exception as e:
print(f"Subscriber error in on_progress: {e}")
def on_done(self, **kwargs):
exception = kwargs.get('exception')
if exception:
self.coordinator.global_stats['total_failed'] += 1
else:
self.coordinator.global_stats['total_completed'] += 1
for sub in self.coordinator.subscribers:
try:
sub.on_done(**kwargs)
except Exception as e:
print(f"Subscriber error in on_done: {e}")
return CoordinatedSubscriber(self)
def print_global_stats(self):
"""Print global statistics across all subscribers."""
stats = self.global_stats
print(f"\nGlobal Stats:")
print(f" Queued: {stats['total_queued']}")
print(f" Completed: {stats['total_completed']}")
print(f" Failed: {stats['total_failed']}")
print(f" Total bytes: {stats['total_bytes'] / (1024*1024):.1f} MB")
# Use subscriber coordination
coordinator = SubscriberCoordinator()
# Add multiple subscribers
coordinator.add_subscriber(ProgressSubscriber("Global Progress"))
coordinator.add_subscriber(TransferEventLogger())
coordinator.add_subscriber(MetricsCollector())
# Create coordinated subscriber
coordinated_sub = coordinator.create_coordinated_subscriber()
# Use with transfers
with open('/tmp/test_file.dat', 'rb') as f:
future = transfer_manager.upload(
f, 'my-bucket', 'test_file.dat',
subscribers=[coordinated_sub]
)
future.result()
coordinator.print_global_stats()transfer_id: Unique identifier for the transfercall_args: Original method call arguments (bucket, key, etc.)user_context: User-defined context dictionarybytes_transferred: Bytes transferred in this progress eventtotal_bytes_transferred: Total bytes transferred so far (optional)transfer_size: Total transfer size if known (optional)transfer_id: Unique identifier for the transfertransfer_id: Unique identifier for the transferexception: Exception object if transfer failed (None if successful)result: Transfer result objectcall_args: Original method call argumentsInstall with Tessl CLI
npx tessl i tessl/pypi-s3transfer