CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-s3transfer

An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.

Pending
Overview
Eval results
Files

bandwidth-management.mddocs/

Bandwidth Management

Comprehensive bandwidth limiting system using leaky bucket algorithms, consumption scheduling, and rate tracking for controlling S3 transfer rates and managing network resource utilization.

Capabilities

BandwidthLimiter

Main bandwidth limiting coordinator that creates bandwidth-limited streams using leaky bucket algorithms for rate control.

class BandwidthLimiter:
    """
    Limits bandwidth for S3 transfers using leaky bucket algorithm.
    
    Args:
        leaky_bucket: LeakyBucket instance for rate limiting
        time_utils: Time utility instance (optional)
    """
    def __init__(self, leaky_bucket, time_utils=None): ...
    
    def get_bandwith_limited_stream(self, stream, transfer_coordinator):
        """
        Create a bandwidth-limited wrapper around a stream.
        
        Args:
            stream: Stream to wrap with bandwidth limiting
            transfer_coordinator: TransferCoordinator for the transfer
        
        Returns:
            BandwidthLimitedStream: Bandwidth-limited stream wrapper
        """

BandwidthLimitedStream

Stream wrapper that applies bandwidth limiting to read operations using token bucket consumption.

class BandwidthLimitedStream:
    """
    Stream wrapper with bandwidth limiting capabilities.
    
    Args:
        stream: Underlying stream to wrap
        leaky_bucket: LeakyBucket for rate limiting
        transfer_coordinator: TransferCoordinator for the transfer
        time_utils: Time utility instance
    """
    def __init__(self, stream, leaky_bucket, transfer_coordinator, time_utils=None): ...
    
    def read(self, amount=None) -> bytes:
        """
        Read from stream with bandwidth limiting.
        
        Args:
            amount (int, optional): Number of bytes to read
        
        Returns:
            bytes: Data read from stream (may be less than requested due to rate limiting)
        """
    
    def enable_bandwidth_limiting(self):
        """Enable bandwidth limiting for this stream."""
    
    def disable_bandwidth_limiting(self):
        """Disable bandwidth limiting for this stream."""
    
    def signal_transferring(self):
        """Signal that transfer is currently active."""
    
    def signal_not_transferring(self):
        """Signal that transfer is not currently active."""

LeakyBucket

Leaky bucket algorithm implementation for bandwidth control with token-based consumption and rate limiting.

class LeakyBucket:
    """
    Leaky bucket algorithm implementation for bandwidth control.
    
    Args:
        max_rate (int): Maximum rate in bytes per second
        time_utils: Time utility instance (optional)
    """
    def __init__(self, max_rate: int, time_utils=None): ...
    
    def consume(self, amount: int, request_token):
        """
        Consume bandwidth tokens from the bucket.
        
        Args:
            amount (int): Number of bytes to consume
            request_token: RequestToken for this consumption request
        
        Raises:
            RequestExceededException: If request exceeds available tokens
        """

RequestToken

Token representing a bandwidth consumption request with timing and amount information.

class RequestToken:
    """
    Token for bandwidth consumption requests.
    
    Args:
        amount (int): Number of bytes requested
        time_requested (float): Time when request was made
    """
    def __init__(self, amount: int, time_requested: float): ...
    
    @property
    def amount(self) -> int:
        """Number of bytes requested."""
    
    @property
    def time_requested(self) -> float:
        """Time when request was made."""

TimeUtils

Time utility class providing consistent time operations for bandwidth calculations.

class TimeUtils:
    """
    Time utilities for bandwidth management.
    """
    def time(self) -> float:
        """
        Get current time.
        
        Returns:
            float: Current time in seconds since epoch
        """
    
    def sleep(self, seconds: float):
        """
        Sleep for specified duration.
        
        Args:
            seconds (float): Duration to sleep in seconds
        """

ConsumptionScheduler

Scheduler for managing bandwidth consumption requests with timing and queuing support.

class ConsumptionScheduler:
    """
    Schedules bandwidth consumption requests.
    
    Args:
        leaky_bucket: LeakyBucket for rate limiting
        time_utils: Time utility instance
    """
    def __init__(self, leaky_bucket, time_utils=None): ...
    
    def is_scheduled(self, request_token) -> bool:
        """
        Check if a request is scheduled for future consumption.
        
        Args:
            request_token: RequestToken to check
        
        Returns:
            bool: True if scheduled, False otherwise
        """
    
    def schedule_consumption(self, request_token, retry_time: float):
        """
        Schedule a request for future consumption.
        
        Args:
            request_token: RequestToken to schedule
            retry_time (float): Time when to retry consumption
        """
    
    def process_scheduled_consumption(self):
        """Process all scheduled consumption requests that are ready."""

BandwidthRateTracker

Tracks bandwidth consumption rates and provides projections for rate limiting decisions.

class BandwidthRateTracker:
    """
    Tracks bandwidth consumption rate over time.
    
    Args:
        time_utils: Time utility instance
    """
    def __init__(self, time_utils=None): ...
    
    def get_projected_rate(self) -> float:
        """
        Get projected bandwidth consumption rate.
        
        Returns:
            float: Projected rate in bytes per second
        """
    
    def record_consumption_rate(self, amount: int, time_to_consume: float):
        """
        Record a consumption event for rate tracking.
        
        Args:
            amount (int): Number of bytes consumed
            time_to_consume (float): Time taken for consumption
        """
    
    @property
    def current_rate(self) -> float:
        """
        Current consumption rate.
        
        Returns:
            float: Current rate in bytes per second
        """

RequestExceededException

Exception raised when bandwidth requests exceed available capacity.

class RequestExceededException(Exception):
    """
    Raised when bandwidth request exceeds available capacity.
    
    Args:
        requested_amt (int): Number of bytes requested
        retry_time (float): Time when request can be retried
    """
    def __init__(self, requested_amt: int, retry_time: float): ...
    
    @property
    def requested_amt(self) -> int:
        """Number of bytes that were requested."""
    
    @property
    def retry_time(self) -> float:
        """Time when request can be retried."""

Usage Examples

Basic Bandwidth Limiting

from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
from s3transfer.manager import TransferManager, TransferConfig
import boto3

# Create bandwidth limiter with 1MB/s limit
max_rate = 1 * 1024 * 1024  # 1MB/s
leaky_bucket = LeakyBucket(max_rate)
bandwidth_limiter = BandwidthLimiter(leaky_bucket)

# Configure transfer manager with bandwidth limiting
config = TransferConfig(max_bandwidth=max_rate)
client = boto3.client('s3')
transfer_manager = TransferManager(client, config)

try:
    # Transfers will be automatically bandwidth limited
    with open('/tmp/large_file.dat', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'large_file.dat')
        future.result()
    
    print("Upload completed with bandwidth limiting")

finally:
    transfer_manager.shutdown()

Manual Stream Bandwidth Limiting

from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
from s3transfer.futures import TransferCoordinator
import boto3

# Create bandwidth limiting components
max_rate = 512 * 1024  # 512KB/s
leaky_bucket = LeakyBucket(max_rate)
bandwidth_limiter = BandwidthLimiter(leaky_bucket)

# Create transfer coordinator
coordinator = TransferCoordinator()

# Download with manual bandwidth limiting
client = boto3.client('s3')
response = client.get_object(Bucket='my-bucket', Key='large-file.dat')

# Wrap response body with bandwidth limiting
limited_stream = bandwidth_limiter.get_bandwith_limited_stream(
    response['Body'], coordinator
)

# Read with automatic rate limiting
with open('/tmp/downloaded.dat', 'wb') as f:
    while True:
        # Read operations are automatically rate limited
        chunk = limited_stream.read(8192)
        if not chunk:
            break
        f.write(chunk)
        print(f"Downloaded chunk of {len(chunk)} bytes")

print("Download completed with rate limiting")

Dynamic Bandwidth Control

from s3transfer.bandwidth import BandwidthLimitedStream, LeakyBucket
import time

# Create rate-limited stream
max_rate = 2 * 1024 * 1024  # 2MB/s
leaky_bucket = LeakyBucket(max_rate)

# Simulate a download stream
class MockStream:
    def __init__(self, data_size):
        self.data = b'x' * data_size
        self.position = 0
    
    def read(self, amount=None):
        if amount is None:
            amount = len(self.data) - self.position
        end = min(self.position + amount, len(self.data))
        result = self.data[self.position:end]
        self.position = end
        return result

# Create bandwidth-limited stream
mock_stream = MockStream(10 * 1024 * 1024)  # 10MB of data
coordinator = TransferCoordinator()
limited_stream = BandwidthLimitedStream(mock_stream, leaky_bucket, coordinator)

start_time = time.time()

# Download with dynamic bandwidth control
total_bytes = 0
while True:
    # Dynamically enable/disable bandwidth limiting
    if total_bytes < 5 * 1024 * 1024:  # First 5MB unlimited
        limited_stream.disable_bandwidth_limiting()
    else:  # Remaining data rate limited
        limited_stream.enable_bandwidth_limiting()
    
    chunk = limited_stream.read(64 * 1024)  # 64KB chunks
    if not chunk:
        break
    
    total_bytes += len(chunk)
    elapsed = time.time() - start_time
    current_rate = total_bytes / elapsed if elapsed > 0 else 0
    
    print(f"Downloaded: {total_bytes} bytes, Rate: {current_rate / 1024:.1f} KB/s")

end_time = time.time()
print(f"Total time: {end_time - start_time:.2f} seconds")
print(f"Average rate: {total_bytes / (end_time - start_time) / 1024:.1f} KB/s")

Bandwidth Rate Tracking

from s3transfer.bandwidth import BandwidthRateTracker
import time
import random

# Create rate tracker
rate_tracker = BandwidthRateTracker()

# Simulate transfer operations with varying rates
print("Simulating bandwidth consumption...")

for i in range(20):
    # Simulate varying chunk sizes and transfer times
    chunk_size = random.randint(1024, 64 * 1024)  # 1KB to 64KB
    transfer_time = random.uniform(0.1, 2.0)      # 0.1 to 2.0 seconds
    
    # Record the consumption
    rate_tracker.record_consumption_rate(chunk_size, transfer_time)
    
    # Get current and projected rates
    current_rate = rate_tracker.current_rate
    projected_rate = rate_tracker.get_projected_rate()
    
    print(f"Chunk {i+1}: {chunk_size} bytes in {transfer_time:.2f}s")
    print(f"  Current rate: {current_rate / 1024:.1f} KB/s")
    print(f"  Projected rate: {projected_rate / 1024:.1f} KB/s")
    
    time.sleep(0.1)  # Brief pause between operations

Custom Time Utilities

from s3transfer.bandwidth import TimeUtils, LeakyBucket
import time

class CustomTimeUtils(TimeUtils):
    """Custom time utilities with logging."""
    
    def time(self):
        current_time = time.time()
        print(f"Time check: {current_time}")
        return current_time
    
    def sleep(self, seconds):
        print(f"Sleeping for {seconds} seconds...")
        time.sleep(seconds)
        print("Sleep completed")

# Use custom time utils with leaky bucket
custom_time = CustomTimeUtils()
leaky_bucket = LeakyBucket(1024 * 1024, time_utils=custom_time)  # 1MB/s

# Time operations will now be logged
try:
    request_token = leaky_bucket.RequestToken(1024, custom_time.time())
    leaky_bucket.consume(1024, request_token)
    print("Consumption successful")
except Exception as e:
    print(f"Consumption failed: {e}")

Advanced Scheduling

from s3transfer.bandwidth import ConsumptionScheduler, LeakyBucket, RequestToken
import time

# Create scheduler with leaky bucket
max_rate = 1024 * 1024  # 1MB/s
leaky_bucket = LeakyBucket(max_rate)
scheduler = ConsumptionScheduler(leaky_bucket)

# Create multiple requests
requests = []
current_time = time.time()

for i in range(5):
    token = RequestToken(512 * 1024, current_time + i * 0.1)  # 512KB requests
    requests.append(token)

print("Scheduling bandwidth requests...")

# Try to consume immediately, schedule if not possible
for i, token in enumerate(requests):
    try:
        leaky_bucket.consume(token.amount, token)
        print(f"Request {i+1}: Immediate consumption successful")
    except Exception as e:
        # Schedule for later consumption
        retry_time = current_time + 1.0  # Retry in 1 second
        scheduler.schedule_consumption(token, retry_time)
        print(f"Request {i+1}: Scheduled for later ({e})")

# Process scheduled requests
print("\nProcessing scheduled requests...")
time.sleep(1.5)  # Wait for scheduled time

scheduler.process_scheduled_consumption()
print("Scheduled consumption processing completed")

Error Handling and Recovery

from s3transfer.bandwidth import (
    BandwidthLimiter, LeakyBucket, RequestExceededException
)
import time

# Create bandwidth limiter with low rate for demonstration
max_rate = 1024  # Very low 1KB/s for quick demonstration
leaky_bucket = LeakyBucket(max_rate)
bandwidth_limiter = BandwidthLimiter(leaky_bucket)

class RetryableStream:
    def __init__(self, data):
        self.data = data
        self.position = 0
    
    def read(self, amount=None):
        if amount is None:
            amount = len(self.data) - self.position
        end = min(self.position + amount, len(self.data))
        result = self.data[self.position:end]
        self.position = end
        return result

# Create test scenario
test_data = b'x' * 10240  # 10KB test data
stream = RetryableStream(test_data)
coordinator = TransferCoordinator()

limited_stream = bandwidth_limiter.get_bandwith_limited_stream(stream, coordinator)

print("Testing bandwidth limiting with error handling...")

total_read = 0
retries = 0
max_retries = 5

while total_read < len(test_data) and retries < max_retries:
    try:
        # Try to read a large chunk (will likely hit rate limit)
        chunk = limited_stream.read(2048)  # 2KB chunk
        
        if chunk:
            total_read += len(chunk)
            print(f"Successfully read {len(chunk)} bytes (total: {total_read})")
        else:
            break
            
    except RequestExceededException as e:
        retries += 1
        print(f"Rate limit exceeded: {e.requested_amt} bytes")
        print(f"Retry after: {e.retry_time}")
        
        # Wait until retry time
        sleep_time = e.retry_time - time.time()
        if sleep_time > 0:
            print(f"Waiting {sleep_time:.2f} seconds...")
            time.sleep(sleep_time)
    
    except Exception as e:
        print(f"Unexpected error: {e}")
        break

print(f"Transfer completed: {total_read}/{len(test_data)} bytes")
print(f"Retries: {retries}")

Integration with TransferManager

from s3transfer.manager import TransferManager, TransferConfig
from s3transfer.subscribers import BaseSubscriber
import boto3
import time

class BandwidthMonitorSubscriber(BaseSubscriber):
    """Subscriber that monitors bandwidth usage."""
    
    def __init__(self):
        self.start_time = None
        self.total_bytes = 0
    
    def on_queued(self, **kwargs):
        self.start_time = time.time()
        print("Transfer queued - monitoring bandwidth...")
    
    def on_progress(self, bytes_transferred, **kwargs):
        self.total_bytes += bytes_transferred
        if self.start_time:
            elapsed = time.time() - self.start_time
            if elapsed > 0:
                rate = self.total_bytes / elapsed
                print(f"Current rate: {rate / 1024:.1f} KB/s")
    
    def on_done(self, **kwargs):
        if self.start_time:
            elapsed = time.time() - self.start_time
            avg_rate = self.total_bytes / elapsed if elapsed > 0 else 0
            print(f"Transfer completed - Average rate: {avg_rate / 1024:.1f} KB/s")

# Configure transfer manager with bandwidth limiting
config = TransferConfig(
    max_bandwidth=2 * 1024 * 1024,  # 2MB/s limit
    multipart_threshold=5 * 1024 * 1024,  # 5MB threshold
    multipart_chunksize=1 * 1024 * 1024   # 1MB chunks
)

client = boto3.client('s3')
transfer_manager = TransferManager(client, config)

try:
    # Create bandwidth monitoring subscriber
    bandwidth_monitor = BandwidthMonitorSubscriber()
    
    # Upload with bandwidth monitoring
    with open('/tmp/test_file.dat', 'rb') as f:
        future = transfer_manager.upload(
            f, 'my-bucket', 'test_file.dat',
            subscribers=[bandwidth_monitor]
        )
        
        # Monitor progress
        while not future.done():
            time.sleep(1)
        
        result = future.result()
        print("Upload completed successfully!")

finally:
    transfer_manager.shutdown()

Configuration Guidelines

Rate Selection

# Conservative rates for shared networks
conservative_rate = 1 * 1024 * 1024  # 1MB/s

# Moderate rates for dedicated connections
moderate_rate = 10 * 1024 * 1024     # 10MB/s

# High rates for high-bandwidth connections
high_rate = 100 * 1024 * 1024        # 100MB/s

# Adaptive rate based on connection testing
def test_connection_speed():
    # Implementation would test actual throughput
    return 50 * 1024 * 1024  # 50MB/s example

adaptive_rate = test_connection_speed()

Bucket Configuration

from s3transfer.bandwidth import LeakyBucket

# Burst-tolerant configuration
burst_bucket = LeakyBucket(
    max_rate=5 * 1024 * 1024,  # 5MB/s sustained
    # Additional parameters for burst handling would go here
)

# Strict rate limiting
strict_bucket = LeakyBucket(
    max_rate=1 * 1024 * 1024,  # 1MB/s strict limit
)

Best Practices

Bandwidth Management

  1. Monitor actual rates: Use rate tracking to verify bandwidth limiting effectiveness
  2. Handle exceptions: Catch and handle RequestExceededException appropriately
  3. Use appropriate rates: Set realistic limits based on network capacity
  4. Enable selectively: Enable bandwidth limiting only when needed

Performance Optimization

  1. Balance chunk sizes: Larger chunks reduce overhead but may impact responsiveness
  2. Consider latency: Account for network latency in rate calculations
  3. Monitor resource usage: Bandwidth limiting adds CPU overhead
  4. Test configurations: Validate bandwidth settings with real workloads

Error Recovery

  1. Implement retry logic: Handle rate limit exceptions with appropriate delays
  2. Use exponential backoff: Increase delays for repeated failures
  3. Set maximum retries: Prevent infinite retry loops
  4. Log bandwidth events: Monitor bandwidth limiting for debugging

Install with Tessl CLI

npx tessl i tessl/pypi-s3transfer

docs

bandwidth-management.md

configuration.md

crt-support.md

exception-handling.md

file-utilities.md

futures-coordination.md

index.md

legacy-transfer.md

process-pool-downloads.md

subscribers-callbacks.md

transfer-manager.md

tile.json