0
# Bandwidth Management
1
2
Comprehensive bandwidth limiting system using leaky bucket algorithms, consumption scheduling, and rate tracking for controlling S3 transfer rates and managing network resource utilization.
3
4
## Capabilities
5
6
### BandwidthLimiter
7
8
Main bandwidth limiting coordinator that creates bandwidth-limited streams using leaky bucket algorithms for rate control.
9
10
```python { .api }
11
class BandwidthLimiter:
12
"""
13
Limits bandwidth for S3 transfers using leaky bucket algorithm.
14
15
Args:
16
leaky_bucket: LeakyBucket instance for rate limiting
17
time_utils: Time utility instance (optional)
18
"""
19
def __init__(self, leaky_bucket, time_utils=None): ...
20
21
def get_bandwith_limited_stream(self, stream, transfer_coordinator):
22
"""
23
Create a bandwidth-limited wrapper around a stream.
24
25
Args:
26
stream: Stream to wrap with bandwidth limiting
27
transfer_coordinator: TransferCoordinator for the transfer
28
29
Returns:
30
BandwidthLimitedStream: Bandwidth-limited stream wrapper
31
"""
32
```
33
34
### BandwidthLimitedStream
35
36
Stream wrapper that applies bandwidth limiting to read operations using token bucket consumption.
37
38
```python { .api }
39
class BandwidthLimitedStream:
40
"""
41
Stream wrapper with bandwidth limiting capabilities.
42
43
Args:
44
stream: Underlying stream to wrap
45
leaky_bucket: LeakyBucket for rate limiting
46
transfer_coordinator: TransferCoordinator for the transfer
47
time_utils: Time utility instance
48
"""
49
def __init__(self, stream, leaky_bucket, transfer_coordinator, time_utils=None): ...
50
51
def read(self, amount=None) -> bytes:
52
"""
53
Read from stream with bandwidth limiting.
54
55
Args:
56
amount (int, optional): Number of bytes to read
57
58
Returns:
59
bytes: Data read from stream (may be less than requested due to rate limiting)
60
"""
61
62
def enable_bandwidth_limiting(self):
63
"""Enable bandwidth limiting for this stream."""
64
65
def disable_bandwidth_limiting(self):
66
"""Disable bandwidth limiting for this stream."""
67
68
def signal_transferring(self):
69
"""Signal that transfer is currently active."""
70
71
def signal_not_transferring(self):
72
"""Signal that transfer is not currently active."""
73
```
74
75
### LeakyBucket
76
77
Leaky bucket algorithm implementation for bandwidth control with token-based consumption and rate limiting.
78
79
```python { .api }
80
class LeakyBucket:
81
"""
82
Leaky bucket algorithm implementation for bandwidth control.
83
84
Args:
85
max_rate (int): Maximum rate in bytes per second
86
time_utils: Time utility instance (optional)
87
"""
88
def __init__(self, max_rate: int, time_utils=None): ...
89
90
def consume(self, amount: int, request_token):
91
"""
92
Consume bandwidth tokens from the bucket.
93
94
Args:
95
amount (int): Number of bytes to consume
96
request_token: RequestToken for this consumption request
97
98
Raises:
99
RequestExceededException: If request exceeds available tokens
100
"""
101
```
102
103
### RequestToken
104
105
Token representing a bandwidth consumption request with timing and amount information.
106
107
```python { .api }
108
class RequestToken:
109
"""
110
Token for bandwidth consumption requests.
111
112
Args:
113
amount (int): Number of bytes requested
114
time_requested (float): Time when request was made
115
"""
116
def __init__(self, amount: int, time_requested: float): ...
117
118
@property
119
def amount(self) -> int:
120
"""Number of bytes requested."""
121
122
@property
123
def time_requested(self) -> float:
124
"""Time when request was made."""
125
```
126
127
### TimeUtils
128
129
Time utility class providing consistent time operations for bandwidth calculations.
130
131
```python { .api }
132
class TimeUtils:
133
"""
134
Time utilities for bandwidth management.
135
"""
136
def time(self) -> float:
137
"""
138
Get current time.
139
140
Returns:
141
float: Current time in seconds since epoch
142
"""
143
144
def sleep(self, seconds: float):
145
"""
146
Sleep for specified duration.
147
148
Args:
149
seconds (float): Duration to sleep in seconds
150
"""
151
```
152
153
### ConsumptionScheduler
154
155
Scheduler for managing bandwidth consumption requests with timing and queuing support.
156
157
```python { .api }
158
class ConsumptionScheduler:
159
"""
160
Schedules bandwidth consumption requests.
161
162
Args:
163
leaky_bucket: LeakyBucket for rate limiting
164
time_utils: Time utility instance
165
"""
166
def __init__(self, leaky_bucket, time_utils=None): ...
167
168
def is_scheduled(self, request_token) -> bool:
169
"""
170
Check if a request is scheduled for future consumption.
171
172
Args:
173
request_token: RequestToken to check
174
175
Returns:
176
bool: True if scheduled, False otherwise
177
"""
178
179
def schedule_consumption(self, request_token, retry_time: float):
180
"""
181
Schedule a request for future consumption.
182
183
Args:
184
request_token: RequestToken to schedule
185
retry_time (float): Time when to retry consumption
186
"""
187
188
def process_scheduled_consumption(self):
189
"""Process all scheduled consumption requests that are ready."""
190
```
191
192
### BandwidthRateTracker
193
194
Tracks bandwidth consumption rates and provides projections for rate limiting decisions.
195
196
```python { .api }
197
class BandwidthRateTracker:
198
"""
199
Tracks bandwidth consumption rate over time.
200
201
Args:
202
time_utils: Time utility instance
203
"""
204
def __init__(self, time_utils=None): ...
205
206
def get_projected_rate(self) -> float:
207
"""
208
Get projected bandwidth consumption rate.
209
210
Returns:
211
float: Projected rate in bytes per second
212
"""
213
214
def record_consumption_rate(self, amount: int, time_to_consume: float):
215
"""
216
Record a consumption event for rate tracking.
217
218
Args:
219
amount (int): Number of bytes consumed
220
time_to_consume (float): Time taken for consumption
221
"""
222
223
@property
224
def current_rate(self) -> float:
225
"""
226
Current consumption rate.
227
228
Returns:
229
float: Current rate in bytes per second
230
"""
231
```
232
233
### RequestExceededException
234
235
Exception raised when bandwidth requests exceed available capacity.
236
237
```python { .api }
238
class RequestExceededException(Exception):
239
"""
240
Raised when bandwidth request exceeds available capacity.
241
242
Args:
243
requested_amt (int): Number of bytes requested
244
retry_time (float): Time when request can be retried
245
"""
246
def __init__(self, requested_amt: int, retry_time: float): ...
247
248
@property
249
def requested_amt(self) -> int:
250
"""Number of bytes that were requested."""
251
252
@property
253
def retry_time(self) -> float:
254
"""Time when request can be retried."""
255
```
256
257
## Usage Examples
258
259
### Basic Bandwidth Limiting
260
261
```python
262
from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
263
from s3transfer.manager import TransferManager, TransferConfig
264
import boto3
265
266
# Create bandwidth limiter with 1MB/s limit
267
max_rate = 1 * 1024 * 1024 # 1MB/s
268
leaky_bucket = LeakyBucket(max_rate)
269
bandwidth_limiter = BandwidthLimiter(leaky_bucket)
270
271
# Configure transfer manager with bandwidth limiting
272
config = TransferConfig(max_bandwidth=max_rate)
273
client = boto3.client('s3')
274
transfer_manager = TransferManager(client, config)
275
276
try:
277
# Transfers will be automatically bandwidth limited
278
with open('/tmp/large_file.dat', 'rb') as f:
279
future = transfer_manager.upload(f, 'my-bucket', 'large_file.dat')
280
future.result()
281
282
print("Upload completed with bandwidth limiting")
283
284
finally:
285
transfer_manager.shutdown()
286
```
287
288
### Manual Stream Bandwidth Limiting
289
290
```python
291
from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
292
from s3transfer.futures import TransferCoordinator
293
import boto3
294
295
# Create bandwidth limiting components
296
max_rate = 512 * 1024 # 512KB/s
297
leaky_bucket = LeakyBucket(max_rate)
298
bandwidth_limiter = BandwidthLimiter(leaky_bucket)
299
300
# Create transfer coordinator
301
coordinator = TransferCoordinator()
302
303
# Download with manual bandwidth limiting
304
client = boto3.client('s3')
305
response = client.get_object(Bucket='my-bucket', Key='large-file.dat')
306
307
# Wrap response body with bandwidth limiting
308
limited_stream = bandwidth_limiter.get_bandwith_limited_stream(
309
response['Body'], coordinator
310
)
311
312
# Read with automatic rate limiting
313
with open('/tmp/downloaded.dat', 'wb') as f:
314
while True:
315
# Read operations are automatically rate limited
316
chunk = limited_stream.read(8192)
317
if not chunk:
318
break
319
f.write(chunk)
320
print(f"Downloaded chunk of {len(chunk)} bytes")
321
322
print("Download completed with rate limiting")
323
```
324
325
### Dynamic Bandwidth Control
326
327
```python
328
from s3transfer.bandwidth import BandwidthLimitedStream, LeakyBucket
329
import time
330
331
# Create rate-limited stream
332
max_rate = 2 * 1024 * 1024 # 2MB/s
333
leaky_bucket = LeakyBucket(max_rate)
334
335
# Simulate a download stream
336
class MockStream:
337
def __init__(self, data_size):
338
self.data = b'x' * data_size
339
self.position = 0
340
341
def read(self, amount=None):
342
if amount is None:
343
amount = len(self.data) - self.position
344
end = min(self.position + amount, len(self.data))
345
result = self.data[self.position:end]
346
self.position = end
347
return result
348
349
# Create bandwidth-limited stream
350
mock_stream = MockStream(10 * 1024 * 1024) # 10MB of data
351
coordinator = TransferCoordinator()
352
limited_stream = BandwidthLimitedStream(mock_stream, leaky_bucket, coordinator)
353
354
start_time = time.time()
355
356
# Download with dynamic bandwidth control
357
total_bytes = 0
358
while True:
359
# Dynamically enable/disable bandwidth limiting
360
if total_bytes < 5 * 1024 * 1024: # First 5MB unlimited
361
limited_stream.disable_bandwidth_limiting()
362
else: # Remaining data rate limited
363
limited_stream.enable_bandwidth_limiting()
364
365
chunk = limited_stream.read(64 * 1024) # 64KB chunks
366
if not chunk:
367
break
368
369
total_bytes += len(chunk)
370
elapsed = time.time() - start_time
371
current_rate = total_bytes / elapsed if elapsed > 0 else 0
372
373
print(f"Downloaded: {total_bytes} bytes, Rate: {current_rate / 1024:.1f} KB/s")
374
375
end_time = time.time()
376
print(f"Total time: {end_time - start_time:.2f} seconds")
377
print(f"Average rate: {total_bytes / (end_time - start_time) / 1024:.1f} KB/s")
378
```
379
380
### Bandwidth Rate Tracking
381
382
```python
383
from s3transfer.bandwidth import BandwidthRateTracker
384
import time
385
import random
386
387
# Create rate tracker
388
rate_tracker = BandwidthRateTracker()
389
390
# Simulate transfer operations with varying rates
391
print("Simulating bandwidth consumption...")
392
393
for i in range(20):
394
# Simulate varying chunk sizes and transfer times
395
chunk_size = random.randint(1024, 64 * 1024) # 1KB to 64KB
396
transfer_time = random.uniform(0.1, 2.0) # 0.1 to 2.0 seconds
397
398
# Record the consumption
399
rate_tracker.record_consumption_rate(chunk_size, transfer_time)
400
401
# Get current and projected rates
402
current_rate = rate_tracker.current_rate
403
projected_rate = rate_tracker.get_projected_rate()
404
405
print(f"Chunk {i+1}: {chunk_size} bytes in {transfer_time:.2f}s")
406
print(f" Current rate: {current_rate / 1024:.1f} KB/s")
407
print(f" Projected rate: {projected_rate / 1024:.1f} KB/s")
408
409
time.sleep(0.1) # Brief pause between operations
410
```
411
412
### Custom Time Utilities
413
414
```python
415
from s3transfer.bandwidth import TimeUtils, LeakyBucket
416
import time
417
418
class CustomTimeUtils(TimeUtils):
419
"""Custom time utilities with logging."""
420
421
def time(self):
422
current_time = time.time()
423
print(f"Time check: {current_time}")
424
return current_time
425
426
def sleep(self, seconds):
427
print(f"Sleeping for {seconds} seconds...")
428
time.sleep(seconds)
429
print("Sleep completed")
430
431
# Use custom time utils with leaky bucket
432
custom_time = CustomTimeUtils()
433
leaky_bucket = LeakyBucket(1024 * 1024, time_utils=custom_time) # 1MB/s
434
435
# Time operations will now be logged
436
try:
437
request_token = leaky_bucket.RequestToken(1024, custom_time.time())
438
leaky_bucket.consume(1024, request_token)
439
print("Consumption successful")
440
except Exception as e:
441
print(f"Consumption failed: {e}")
442
```
443
444
### Advanced Scheduling
445
446
```python
447
from s3transfer.bandwidth import ConsumptionScheduler, LeakyBucket, RequestToken
448
import time
449
450
# Create scheduler with leaky bucket
451
max_rate = 1024 * 1024 # 1MB/s
452
leaky_bucket = LeakyBucket(max_rate)
453
scheduler = ConsumptionScheduler(leaky_bucket)
454
455
# Create multiple requests
456
requests = []
457
current_time = time.time()
458
459
for i in range(5):
460
token = RequestToken(512 * 1024, current_time + i * 0.1) # 512KB requests
461
requests.append(token)
462
463
print("Scheduling bandwidth requests...")
464
465
# Try to consume immediately, schedule if not possible
466
for i, token in enumerate(requests):
467
try:
468
leaky_bucket.consume(token.amount, token)
469
print(f"Request {i+1}: Immediate consumption successful")
470
except Exception as e:
471
# Schedule for later consumption
472
retry_time = current_time + 1.0 # Retry in 1 second
473
scheduler.schedule_consumption(token, retry_time)
474
print(f"Request {i+1}: Scheduled for later ({e})")
475
476
# Process scheduled requests
477
print("\nProcessing scheduled requests...")
478
time.sleep(1.5) # Wait for scheduled time
479
480
scheduler.process_scheduled_consumption()
481
print("Scheduled consumption processing completed")
482
```
483
484
### Error Handling and Recovery
485
486
```python
487
from s3transfer.bandwidth import (
488
BandwidthLimiter, LeakyBucket, RequestExceededException
489
)
490
import time
491
492
# Create bandwidth limiter with low rate for demonstration
493
max_rate = 1024 # Very low 1KB/s for quick demonstration
494
leaky_bucket = LeakyBucket(max_rate)
495
bandwidth_limiter = BandwidthLimiter(leaky_bucket)
496
497
class RetryableStream:
498
def __init__(self, data):
499
self.data = data
500
self.position = 0
501
502
def read(self, amount=None):
503
if amount is None:
504
amount = len(self.data) - self.position
505
end = min(self.position + amount, len(self.data))
506
result = self.data[self.position:end]
507
self.position = end
508
return result
509
510
# Create test scenario
511
test_data = b'x' * 10240 # 10KB test data
512
stream = RetryableStream(test_data)
513
coordinator = TransferCoordinator()
514
515
limited_stream = bandwidth_limiter.get_bandwith_limited_stream(stream, coordinator)
516
517
print("Testing bandwidth limiting with error handling...")
518
519
total_read = 0
520
retries = 0
521
max_retries = 5
522
523
while total_read < len(test_data) and retries < max_retries:
524
try:
525
# Try to read a large chunk (will likely hit rate limit)
526
chunk = limited_stream.read(2048) # 2KB chunk
527
528
if chunk:
529
total_read += len(chunk)
530
print(f"Successfully read {len(chunk)} bytes (total: {total_read})")
531
else:
532
break
533
534
except RequestExceededException as e:
535
retries += 1
536
print(f"Rate limit exceeded: {e.requested_amt} bytes")
537
print(f"Retry after: {e.retry_time}")
538
539
# Wait until retry time
540
sleep_time = e.retry_time - time.time()
541
if sleep_time > 0:
542
print(f"Waiting {sleep_time:.2f} seconds...")
543
time.sleep(sleep_time)
544
545
except Exception as e:
546
print(f"Unexpected error: {e}")
547
break
548
549
print(f"Transfer completed: {total_read}/{len(test_data)} bytes")
550
print(f"Retries: {retries}")
551
```
552
553
### Integration with TransferManager
554
555
```python
556
from s3transfer.manager import TransferManager, TransferConfig
557
from s3transfer.subscribers import BaseSubscriber
558
import boto3
559
import time
560
561
class BandwidthMonitorSubscriber(BaseSubscriber):
562
"""Subscriber that monitors bandwidth usage."""
563
564
def __init__(self):
565
self.start_time = None
566
self.total_bytes = 0
567
568
def on_queued(self, **kwargs):
569
self.start_time = time.time()
570
print("Transfer queued - monitoring bandwidth...")
571
572
def on_progress(self, bytes_transferred, **kwargs):
573
self.total_bytes += bytes_transferred
574
if self.start_time:
575
elapsed = time.time() - self.start_time
576
if elapsed > 0:
577
rate = self.total_bytes / elapsed
578
print(f"Current rate: {rate / 1024:.1f} KB/s")
579
580
def on_done(self, **kwargs):
581
if self.start_time:
582
elapsed = time.time() - self.start_time
583
avg_rate = self.total_bytes / elapsed if elapsed > 0 else 0
584
print(f"Transfer completed - Average rate: {avg_rate / 1024:.1f} KB/s")
585
586
# Configure transfer manager with bandwidth limiting
587
config = TransferConfig(
588
max_bandwidth=2 * 1024 * 1024, # 2MB/s limit
589
multipart_threshold=5 * 1024 * 1024, # 5MB threshold
590
multipart_chunksize=1 * 1024 * 1024 # 1MB chunks
591
)
592
593
client = boto3.client('s3')
594
transfer_manager = TransferManager(client, config)
595
596
try:
597
# Create bandwidth monitoring subscriber
598
bandwidth_monitor = BandwidthMonitorSubscriber()
599
600
# Upload with bandwidth monitoring
601
with open('/tmp/test_file.dat', 'rb') as f:
602
future = transfer_manager.upload(
603
f, 'my-bucket', 'test_file.dat',
604
subscribers=[bandwidth_monitor]
605
)
606
607
# Monitor progress
608
while not future.done():
609
time.sleep(1)
610
611
result = future.result()
612
print("Upload completed successfully!")
613
614
finally:
615
transfer_manager.shutdown()
616
```
617
618
## Configuration Guidelines
619
620
### Rate Selection
621
622
```python
623
# Conservative rates for shared networks
624
conservative_rate = 1 * 1024 * 1024 # 1MB/s
625
626
# Moderate rates for dedicated connections
627
moderate_rate = 10 * 1024 * 1024 # 10MB/s
628
629
# High rates for high-bandwidth connections
630
high_rate = 100 * 1024 * 1024 # 100MB/s
631
632
# Adaptive rate based on connection testing
633
def test_connection_speed():
634
# Implementation would test actual throughput
635
return 50 * 1024 * 1024 # 50MB/s example
636
637
adaptive_rate = test_connection_speed()
638
```
639
640
### Bucket Configuration
641
642
```python
643
from s3transfer.bandwidth import LeakyBucket
644
645
# Burst-tolerant configuration
646
burst_bucket = LeakyBucket(
647
max_rate=5 * 1024 * 1024, # 5MB/s sustained
648
# Additional parameters for burst handling would go here
649
)
650
651
# Strict rate limiting
652
strict_bucket = LeakyBucket(
653
max_rate=1 * 1024 * 1024, # 1MB/s strict limit
654
)
655
```
656
657
## Best Practices
658
659
### Bandwidth Management
660
661
1. **Monitor actual rates**: Use rate tracking to verify bandwidth limiting effectiveness
662
2. **Handle exceptions**: Catch and handle `RequestExceededException` appropriately
663
3. **Use appropriate rates**: Set realistic limits based on network capacity
664
4. **Enable selectively**: Enable bandwidth limiting only when needed
665
666
### Performance Optimization
667
668
1. **Balance chunk sizes**: Larger chunks reduce overhead but may impact responsiveness
669
2. **Consider latency**: Account for network latency in rate calculations
670
3. **Monitor resource usage**: Bandwidth limiting adds CPU overhead
671
4. **Test configurations**: Validate bandwidth settings with real workloads
672
673
### Error Recovery
674
675
1. **Implement retry logic**: Handle rate limit exceptions with appropriate delays
676
2. **Use exponential backoff**: Increase delays for repeated failures
677
3. **Set maximum retries**: Prevent infinite retry loops
678
4. **Log bandwidth events**: Monitor bandwidth limiting for debugging