An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
—
Comprehensive bandwidth limiting system using leaky bucket algorithms, consumption scheduling, and rate tracking for controlling S3 transfer rates and managing network resource utilization.
Main bandwidth limiting coordinator that creates bandwidth-limited streams using leaky bucket algorithms for rate control.
class BandwidthLimiter:
"""
Limits bandwidth for S3 transfers using leaky bucket algorithm.
Args:
leaky_bucket: LeakyBucket instance for rate limiting
time_utils: Time utility instance (optional)
"""
def __init__(self, leaky_bucket, time_utils=None): ...
def get_bandwith_limited_stream(self, stream, transfer_coordinator):
"""
Create a bandwidth-limited wrapper around a stream.
Args:
stream: Stream to wrap with bandwidth limiting
transfer_coordinator: TransferCoordinator for the transfer
Returns:
BandwidthLimitedStream: Bandwidth-limited stream wrapper
"""Stream wrapper that applies bandwidth limiting to read operations using token bucket consumption.
class BandwidthLimitedStream:
"""
Stream wrapper with bandwidth limiting capabilities.
Args:
stream: Underlying stream to wrap
leaky_bucket: LeakyBucket for rate limiting
transfer_coordinator: TransferCoordinator for the transfer
time_utils: Time utility instance
"""
def __init__(self, stream, leaky_bucket, transfer_coordinator, time_utils=None): ...
def read(self, amount=None) -> bytes:
"""
Read from stream with bandwidth limiting.
Args:
amount (int, optional): Number of bytes to read
Returns:
bytes: Data read from stream (may be less than requested due to rate limiting)
"""
def enable_bandwidth_limiting(self):
"""Enable bandwidth limiting for this stream."""
def disable_bandwidth_limiting(self):
"""Disable bandwidth limiting for this stream."""
def signal_transferring(self):
"""Signal that transfer is currently active."""
def signal_not_transferring(self):
"""Signal that transfer is not currently active."""Leaky bucket algorithm implementation for bandwidth control with token-based consumption and rate limiting.
class LeakyBucket:
"""
Leaky bucket algorithm implementation for bandwidth control.
Args:
max_rate (int): Maximum rate in bytes per second
time_utils: Time utility instance (optional)
"""
def __init__(self, max_rate: int, time_utils=None): ...
def consume(self, amount: int, request_token):
"""
Consume bandwidth tokens from the bucket.
Args:
amount (int): Number of bytes to consume
request_token: RequestToken for this consumption request
Raises:
RequestExceededException: If request exceeds available tokens
"""Token representing a bandwidth consumption request with timing and amount information.
class RequestToken:
"""
Token for bandwidth consumption requests.
Args:
amount (int): Number of bytes requested
time_requested (float): Time when request was made
"""
def __init__(self, amount: int, time_requested: float): ...
@property
def amount(self) -> int:
"""Number of bytes requested."""
@property
def time_requested(self) -> float:
"""Time when request was made."""Time utility class providing consistent time operations for bandwidth calculations.
class TimeUtils:
"""
Time utilities for bandwidth management.
"""
def time(self) -> float:
"""
Get current time.
Returns:
float: Current time in seconds since epoch
"""
def sleep(self, seconds: float):
"""
Sleep for specified duration.
Args:
seconds (float): Duration to sleep in seconds
"""Scheduler for managing bandwidth consumption requests with timing and queuing support.
class ConsumptionScheduler:
"""
Schedules bandwidth consumption requests.
Args:
leaky_bucket: LeakyBucket for rate limiting
time_utils: Time utility instance
"""
def __init__(self, leaky_bucket, time_utils=None): ...
def is_scheduled(self, request_token) -> bool:
"""
Check if a request is scheduled for future consumption.
Args:
request_token: RequestToken to check
Returns:
bool: True if scheduled, False otherwise
"""
def schedule_consumption(self, request_token, retry_time: float):
"""
Schedule a request for future consumption.
Args:
request_token: RequestToken to schedule
retry_time (float): Time when to retry consumption
"""
def process_scheduled_consumption(self):
"""Process all scheduled consumption requests that are ready."""Tracks bandwidth consumption rates and provides projections for rate limiting decisions.
class BandwidthRateTracker:
"""
Tracks bandwidth consumption rate over time.
Args:
time_utils: Time utility instance
"""
def __init__(self, time_utils=None): ...
def get_projected_rate(self) -> float:
"""
Get projected bandwidth consumption rate.
Returns:
float: Projected rate in bytes per second
"""
def record_consumption_rate(self, amount: int, time_to_consume: float):
"""
Record a consumption event for rate tracking.
Args:
amount (int): Number of bytes consumed
time_to_consume (float): Time taken for consumption
"""
@property
def current_rate(self) -> float:
"""
Current consumption rate.
Returns:
float: Current rate in bytes per second
"""Exception raised when bandwidth requests exceed available capacity.
class RequestExceededException(Exception):
"""
Raised when bandwidth request exceeds available capacity.
Args:
requested_amt (int): Number of bytes requested
retry_time (float): Time when request can be retried
"""
def __init__(self, requested_amt: int, retry_time: float): ...
@property
def requested_amt(self) -> int:
"""Number of bytes that were requested."""
@property
def retry_time(self) -> float:
"""Time when request can be retried."""from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
from s3transfer.manager import TransferManager, TransferConfig
import boto3
# Create bandwidth limiter with 1MB/s limit
max_rate = 1 * 1024 * 1024 # 1MB/s
leaky_bucket = LeakyBucket(max_rate)
bandwidth_limiter = BandwidthLimiter(leaky_bucket)
# Configure transfer manager with bandwidth limiting
config = TransferConfig(max_bandwidth=max_rate)
client = boto3.client('s3')
transfer_manager = TransferManager(client, config)
try:
# Transfers will be automatically bandwidth limited
with open('/tmp/large_file.dat', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'large_file.dat')
future.result()
print("Upload completed with bandwidth limiting")
finally:
transfer_manager.shutdown()from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
from s3transfer.futures import TransferCoordinator
import boto3
# Create bandwidth limiting components
max_rate = 512 * 1024 # 512KB/s
leaky_bucket = LeakyBucket(max_rate)
bandwidth_limiter = BandwidthLimiter(leaky_bucket)
# Create transfer coordinator
coordinator = TransferCoordinator()
# Download with manual bandwidth limiting
client = boto3.client('s3')
response = client.get_object(Bucket='my-bucket', Key='large-file.dat')
# Wrap response body with bandwidth limiting
limited_stream = bandwidth_limiter.get_bandwith_limited_stream(
response['Body'], coordinator
)
# Read with automatic rate limiting
with open('/tmp/downloaded.dat', 'wb') as f:
while True:
# Read operations are automatically rate limited
chunk = limited_stream.read(8192)
if not chunk:
break
f.write(chunk)
print(f"Downloaded chunk of {len(chunk)} bytes")
print("Download completed with rate limiting")from s3transfer.bandwidth import BandwidthLimitedStream, LeakyBucket
import time
# Create rate-limited stream
max_rate = 2 * 1024 * 1024 # 2MB/s
leaky_bucket = LeakyBucket(max_rate)
# Simulate a download stream
class MockStream:
def __init__(self, data_size):
self.data = b'x' * data_size
self.position = 0
def read(self, amount=None):
if amount is None:
amount = len(self.data) - self.position
end = min(self.position + amount, len(self.data))
result = self.data[self.position:end]
self.position = end
return result
# Create bandwidth-limited stream
mock_stream = MockStream(10 * 1024 * 1024) # 10MB of data
coordinator = TransferCoordinator()
limited_stream = BandwidthLimitedStream(mock_stream, leaky_bucket, coordinator)
start_time = time.time()
# Download with dynamic bandwidth control
total_bytes = 0
while True:
# Dynamically enable/disable bandwidth limiting
if total_bytes < 5 * 1024 * 1024: # First 5MB unlimited
limited_stream.disable_bandwidth_limiting()
else: # Remaining data rate limited
limited_stream.enable_bandwidth_limiting()
chunk = limited_stream.read(64 * 1024) # 64KB chunks
if not chunk:
break
total_bytes += len(chunk)
elapsed = time.time() - start_time
current_rate = total_bytes / elapsed if elapsed > 0 else 0
print(f"Downloaded: {total_bytes} bytes, Rate: {current_rate / 1024:.1f} KB/s")
end_time = time.time()
print(f"Total time: {end_time - start_time:.2f} seconds")
print(f"Average rate: {total_bytes / (end_time - start_time) / 1024:.1f} KB/s")from s3transfer.bandwidth import BandwidthRateTracker
import time
import random
# Create rate tracker
rate_tracker = BandwidthRateTracker()
# Simulate transfer operations with varying rates
print("Simulating bandwidth consumption...")
for i in range(20):
# Simulate varying chunk sizes and transfer times
chunk_size = random.randint(1024, 64 * 1024) # 1KB to 64KB
transfer_time = random.uniform(0.1, 2.0) # 0.1 to 2.0 seconds
# Record the consumption
rate_tracker.record_consumption_rate(chunk_size, transfer_time)
# Get current and projected rates
current_rate = rate_tracker.current_rate
projected_rate = rate_tracker.get_projected_rate()
print(f"Chunk {i+1}: {chunk_size} bytes in {transfer_time:.2f}s")
print(f" Current rate: {current_rate / 1024:.1f} KB/s")
print(f" Projected rate: {projected_rate / 1024:.1f} KB/s")
time.sleep(0.1) # Brief pause between operationsfrom s3transfer.bandwidth import TimeUtils, LeakyBucket
import time
class CustomTimeUtils(TimeUtils):
"""Custom time utilities with logging."""
def time(self):
current_time = time.time()
print(f"Time check: {current_time}")
return current_time
def sleep(self, seconds):
print(f"Sleeping for {seconds} seconds...")
time.sleep(seconds)
print("Sleep completed")
# Use custom time utils with leaky bucket
custom_time = CustomTimeUtils()
leaky_bucket = LeakyBucket(1024 * 1024, time_utils=custom_time) # 1MB/s
# Time operations will now be logged
try:
request_token = leaky_bucket.RequestToken(1024, custom_time.time())
leaky_bucket.consume(1024, request_token)
print("Consumption successful")
except Exception as e:
print(f"Consumption failed: {e}")from s3transfer.bandwidth import ConsumptionScheduler, LeakyBucket, RequestToken
import time
# Create scheduler with leaky bucket
max_rate = 1024 * 1024 # 1MB/s
leaky_bucket = LeakyBucket(max_rate)
scheduler = ConsumptionScheduler(leaky_bucket)
# Create multiple requests
requests = []
current_time = time.time()
for i in range(5):
token = RequestToken(512 * 1024, current_time + i * 0.1) # 512KB requests
requests.append(token)
print("Scheduling bandwidth requests...")
# Try to consume immediately, schedule if not possible
for i, token in enumerate(requests):
try:
leaky_bucket.consume(token.amount, token)
print(f"Request {i+1}: Immediate consumption successful")
except Exception as e:
# Schedule for later consumption
retry_time = current_time + 1.0 # Retry in 1 second
scheduler.schedule_consumption(token, retry_time)
print(f"Request {i+1}: Scheduled for later ({e})")
# Process scheduled requests
print("\nProcessing scheduled requests...")
time.sleep(1.5) # Wait for scheduled time
scheduler.process_scheduled_consumption()
print("Scheduled consumption processing completed")from s3transfer.bandwidth import (
BandwidthLimiter, LeakyBucket, RequestExceededException
)
import time
# Create bandwidth limiter with low rate for demonstration
max_rate = 1024 # Very low 1KB/s for quick demonstration
leaky_bucket = LeakyBucket(max_rate)
bandwidth_limiter = BandwidthLimiter(leaky_bucket)
class RetryableStream:
def __init__(self, data):
self.data = data
self.position = 0
def read(self, amount=None):
if amount is None:
amount = len(self.data) - self.position
end = min(self.position + amount, len(self.data))
result = self.data[self.position:end]
self.position = end
return result
# Create test scenario
test_data = b'x' * 10240 # 10KB test data
stream = RetryableStream(test_data)
coordinator = TransferCoordinator()
limited_stream = bandwidth_limiter.get_bandwith_limited_stream(stream, coordinator)
print("Testing bandwidth limiting with error handling...")
total_read = 0
retries = 0
max_retries = 5
while total_read < len(test_data) and retries < max_retries:
try:
# Try to read a large chunk (will likely hit rate limit)
chunk = limited_stream.read(2048) # 2KB chunk
if chunk:
total_read += len(chunk)
print(f"Successfully read {len(chunk)} bytes (total: {total_read})")
else:
break
except RequestExceededException as e:
retries += 1
print(f"Rate limit exceeded: {e.requested_amt} bytes")
print(f"Retry after: {e.retry_time}")
# Wait until retry time
sleep_time = e.retry_time - time.time()
if sleep_time > 0:
print(f"Waiting {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
except Exception as e:
print(f"Unexpected error: {e}")
break
print(f"Transfer completed: {total_read}/{len(test_data)} bytes")
print(f"Retries: {retries}")from s3transfer.manager import TransferManager, TransferConfig
from s3transfer.subscribers import BaseSubscriber
import boto3
import time
class BandwidthMonitorSubscriber(BaseSubscriber):
"""Subscriber that monitors bandwidth usage."""
def __init__(self):
self.start_time = None
self.total_bytes = 0
def on_queued(self, **kwargs):
self.start_time = time.time()
print("Transfer queued - monitoring bandwidth...")
def on_progress(self, bytes_transferred, **kwargs):
self.total_bytes += bytes_transferred
if self.start_time:
elapsed = time.time() - self.start_time
if elapsed > 0:
rate = self.total_bytes / elapsed
print(f"Current rate: {rate / 1024:.1f} KB/s")
def on_done(self, **kwargs):
if self.start_time:
elapsed = time.time() - self.start_time
avg_rate = self.total_bytes / elapsed if elapsed > 0 else 0
print(f"Transfer completed - Average rate: {avg_rate / 1024:.1f} KB/s")
# Configure transfer manager with bandwidth limiting
config = TransferConfig(
max_bandwidth=2 * 1024 * 1024, # 2MB/s limit
multipart_threshold=5 * 1024 * 1024, # 5MB threshold
multipart_chunksize=1 * 1024 * 1024 # 1MB chunks
)
client = boto3.client('s3')
transfer_manager = TransferManager(client, config)
try:
# Create bandwidth monitoring subscriber
bandwidth_monitor = BandwidthMonitorSubscriber()
# Upload with bandwidth monitoring
with open('/tmp/test_file.dat', 'rb') as f:
future = transfer_manager.upload(
f, 'my-bucket', 'test_file.dat',
subscribers=[bandwidth_monitor]
)
# Monitor progress
while not future.done():
time.sleep(1)
result = future.result()
print("Upload completed successfully!")
finally:
transfer_manager.shutdown()# Conservative rates for shared networks
conservative_rate = 1 * 1024 * 1024 # 1MB/s
# Moderate rates for dedicated connections
moderate_rate = 10 * 1024 * 1024 # 10MB/s
# High rates for high-bandwidth connections
high_rate = 100 * 1024 * 1024 # 100MB/s
# Adaptive rate based on connection testing
def test_connection_speed():
# Implementation would test actual throughput
return 50 * 1024 * 1024 # 50MB/s example
adaptive_rate = test_connection_speed()from s3transfer.bandwidth import LeakyBucket
# Burst-tolerant configuration
burst_bucket = LeakyBucket(
max_rate=5 * 1024 * 1024, # 5MB/s sustained
# Additional parameters for burst handling would go here
)
# Strict rate limiting
strict_bucket = LeakyBucket(
max_rate=1 * 1024 * 1024, # 1MB/s strict limit
)RequestExceededException appropriatelyInstall with Tessl CLI
npx tessl i tessl/pypi-s3transfer