or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bandwidth-management.mdconfiguration.mdcrt-support.mdexception-handling.mdfile-utilities.mdfutures-coordination.mdindex.mdlegacy-transfer.mdprocess-pool-downloads.mdsubscribers-callbacks.mdtransfer-manager.md

bandwidth-management.mddocs/

0

# Bandwidth Management

1

2

Comprehensive bandwidth limiting system using leaky bucket algorithms, consumption scheduling, and rate tracking for controlling S3 transfer rates and managing network resource utilization.

3

4

## Capabilities

5

6

### BandwidthLimiter

7

8

Main bandwidth limiting coordinator that creates bandwidth-limited streams using leaky bucket algorithms for rate control.

9

10

```python { .api }

11

class BandwidthLimiter:

12

"""

13

Limits bandwidth for S3 transfers using leaky bucket algorithm.

14

15

Args:

16

leaky_bucket: LeakyBucket instance for rate limiting

17

time_utils: Time utility instance (optional)

18

"""

19

def __init__(self, leaky_bucket, time_utils=None): ...

20

21

def get_bandwith_limited_stream(self, stream, transfer_coordinator):

22

"""

23

Create a bandwidth-limited wrapper around a stream.

24

25

Args:

26

stream: Stream to wrap with bandwidth limiting

27

transfer_coordinator: TransferCoordinator for the transfer

28

29

Returns:

30

BandwidthLimitedStream: Bandwidth-limited stream wrapper

31

"""

32

```

33

34

### BandwidthLimitedStream

35

36

Stream wrapper that applies bandwidth limiting to read operations using token bucket consumption.

37

38

```python { .api }

39

class BandwidthLimitedStream:

40

"""

41

Stream wrapper with bandwidth limiting capabilities.

42

43

Args:

44

stream: Underlying stream to wrap

45

leaky_bucket: LeakyBucket for rate limiting

46

transfer_coordinator: TransferCoordinator for the transfer

47

time_utils: Time utility instance

48

"""

49

def __init__(self, stream, leaky_bucket, transfer_coordinator, time_utils=None): ...

50

51

def read(self, amount=None) -> bytes:

52

"""

53

Read from stream with bandwidth limiting.

54

55

Args:

56

amount (int, optional): Number of bytes to read

57

58

Returns:

59

bytes: Data read from stream (may be less than requested due to rate limiting)

60

"""

61

62

def enable_bandwidth_limiting(self):

63

"""Enable bandwidth limiting for this stream."""

64

65

def disable_bandwidth_limiting(self):

66

"""Disable bandwidth limiting for this stream."""

67

68

def signal_transferring(self):

69

"""Signal that transfer is currently active."""

70

71

def signal_not_transferring(self):

72

"""Signal that transfer is not currently active."""

73

```

74

75

### LeakyBucket

76

77

Leaky bucket algorithm implementation for bandwidth control with token-based consumption and rate limiting.

78

79

```python { .api }

80

class LeakyBucket:

81

"""

82

Leaky bucket algorithm implementation for bandwidth control.

83

84

Args:

85

max_rate (int): Maximum rate in bytes per second

86

time_utils: Time utility instance (optional)

87

"""

88

def __init__(self, max_rate: int, time_utils=None): ...

89

90

def consume(self, amount: int, request_token):

91

"""

92

Consume bandwidth tokens from the bucket.

93

94

Args:

95

amount (int): Number of bytes to consume

96

request_token: RequestToken for this consumption request

97

98

Raises:

99

RequestExceededException: If request exceeds available tokens

100

"""

101

```

102

103

### RequestToken

104

105

Token representing a bandwidth consumption request with timing and amount information.

106

107

```python { .api }

108

class RequestToken:

109

"""

110

Token for bandwidth consumption requests.

111

112

Args:

113

amount (int): Number of bytes requested

114

time_requested (float): Time when request was made

115

"""

116

def __init__(self, amount: int, time_requested: float): ...

117

118

@property

119

def amount(self) -> int:

120

"""Number of bytes requested."""

121

122

@property

123

def time_requested(self) -> float:

124

"""Time when request was made."""

125

```

126

127

### TimeUtils

128

129

Time utility class providing consistent time operations for bandwidth calculations.

130

131

```python { .api }

132

class TimeUtils:

133

"""

134

Time utilities for bandwidth management.

135

"""

136

def time(self) -> float:

137

"""

138

Get current time.

139

140

Returns:

141

float: Current time in seconds since epoch

142

"""

143

144

def sleep(self, seconds: float):

145

"""

146

Sleep for specified duration.

147

148

Args:

149

seconds (float): Duration to sleep in seconds

150

"""

151

```

152

153

### ConsumptionScheduler

154

155

Scheduler for managing bandwidth consumption requests with timing and queuing support.

156

157

```python { .api }

158

class ConsumptionScheduler:

159

"""

160

Schedules bandwidth consumption requests.

161

162

Args:

163

leaky_bucket: LeakyBucket for rate limiting

164

time_utils: Time utility instance

165

"""

166

def __init__(self, leaky_bucket, time_utils=None): ...

167

168

def is_scheduled(self, request_token) -> bool:

169

"""

170

Check if a request is scheduled for future consumption.

171

172

Args:

173

request_token: RequestToken to check

174

175

Returns:

176

bool: True if scheduled, False otherwise

177

"""

178

179

def schedule_consumption(self, request_token, retry_time: float):

180

"""

181

Schedule a request for future consumption.

182

183

Args:

184

request_token: RequestToken to schedule

185

retry_time (float): Time when to retry consumption

186

"""

187

188

def process_scheduled_consumption(self):

189

"""Process all scheduled consumption requests that are ready."""

190

```

191

192

### BandwidthRateTracker

193

194

Tracks bandwidth consumption rates and provides projections for rate limiting decisions.

195

196

```python { .api }

197

class BandwidthRateTracker:

198

"""

199

Tracks bandwidth consumption rate over time.

200

201

Args:

202

time_utils: Time utility instance

203

"""

204

def __init__(self, time_utils=None): ...

205

206

def get_projected_rate(self) -> float:

207

"""

208

Get projected bandwidth consumption rate.

209

210

Returns:

211

float: Projected rate in bytes per second

212

"""

213

214

def record_consumption_rate(self, amount: int, time_to_consume: float):

215

"""

216

Record a consumption event for rate tracking.

217

218

Args:

219

amount (int): Number of bytes consumed

220

time_to_consume (float): Time taken for consumption

221

"""

222

223

@property

224

def current_rate(self) -> float:

225

"""

226

Current consumption rate.

227

228

Returns:

229

float: Current rate in bytes per second

230

"""

231

```

232

233

### RequestExceededException

234

235

Exception raised when bandwidth requests exceed available capacity.

236

237

```python { .api }

238

class RequestExceededException(Exception):

239

"""

240

Raised when bandwidth request exceeds available capacity.

241

242

Args:

243

requested_amt (int): Number of bytes requested

244

retry_time (float): Time when request can be retried

245

"""

246

def __init__(self, requested_amt: int, retry_time: float): ...

247

248

@property

249

def requested_amt(self) -> int:

250

"""Number of bytes that were requested."""

251

252

@property

253

def retry_time(self) -> float:

254

"""Time when request can be retried."""

255

```

256

257

## Usage Examples

258

259

### Basic Bandwidth Limiting

260

261

```python

262

from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket

263

from s3transfer.manager import TransferManager, TransferConfig

264

import boto3

265

266

# Create bandwidth limiter with 1MB/s limit

267

max_rate = 1 * 1024 * 1024 # 1MB/s

268

leaky_bucket = LeakyBucket(max_rate)

269

bandwidth_limiter = BandwidthLimiter(leaky_bucket)

270

271

# Configure transfer manager with bandwidth limiting

272

config = TransferConfig(max_bandwidth=max_rate)

273

client = boto3.client('s3')

274

transfer_manager = TransferManager(client, config)

275

276

try:

277

# Transfers will be automatically bandwidth limited

278

with open('/tmp/large_file.dat', 'rb') as f:

279

future = transfer_manager.upload(f, 'my-bucket', 'large_file.dat')

280

future.result()

281

282

print("Upload completed with bandwidth limiting")

283

284

finally:

285

transfer_manager.shutdown()

286

```

287

288

### Manual Stream Bandwidth Limiting

289

290

```python

291

from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket

292

from s3transfer.futures import TransferCoordinator

293

import boto3

294

295

# Create bandwidth limiting components

296

max_rate = 512 * 1024 # 512KB/s

297

leaky_bucket = LeakyBucket(max_rate)

298

bandwidth_limiter = BandwidthLimiter(leaky_bucket)

299

300

# Create transfer coordinator

301

coordinator = TransferCoordinator()

302

303

# Download with manual bandwidth limiting

304

client = boto3.client('s3')

305

response = client.get_object(Bucket='my-bucket', Key='large-file.dat')

306

307

# Wrap response body with bandwidth limiting

308

limited_stream = bandwidth_limiter.get_bandwith_limited_stream(

309

response['Body'], coordinator

310

)

311

312

# Read with automatic rate limiting

313

with open('/tmp/downloaded.dat', 'wb') as f:

314

while True:

315

# Read operations are automatically rate limited

316

chunk = limited_stream.read(8192)

317

if not chunk:

318

break

319

f.write(chunk)

320

print(f"Downloaded chunk of {len(chunk)} bytes")

321

322

print("Download completed with rate limiting")

323

```

324

325

### Dynamic Bandwidth Control

326

327

```python

328

from s3transfer.bandwidth import BandwidthLimitedStream, LeakyBucket

329

import time

330

331

# Create rate-limited stream

332

max_rate = 2 * 1024 * 1024 # 2MB/s

333

leaky_bucket = LeakyBucket(max_rate)

334

335

# Simulate a download stream

336

class MockStream:

337

def __init__(self, data_size):

338

self.data = b'x' * data_size

339

self.position = 0

340

341

def read(self, amount=None):

342

if amount is None:

343

amount = len(self.data) - self.position

344

end = min(self.position + amount, len(self.data))

345

result = self.data[self.position:end]

346

self.position = end

347

return result

348

349

# Create bandwidth-limited stream

350

mock_stream = MockStream(10 * 1024 * 1024) # 10MB of data

351

coordinator = TransferCoordinator()

352

limited_stream = BandwidthLimitedStream(mock_stream, leaky_bucket, coordinator)

353

354

start_time = time.time()

355

356

# Download with dynamic bandwidth control

357

total_bytes = 0

358

while True:

359

# Dynamically enable/disable bandwidth limiting

360

if total_bytes < 5 * 1024 * 1024: # First 5MB unlimited

361

limited_stream.disable_bandwidth_limiting()

362

else: # Remaining data rate limited

363

limited_stream.enable_bandwidth_limiting()

364

365

chunk = limited_stream.read(64 * 1024) # 64KB chunks

366

if not chunk:

367

break

368

369

total_bytes += len(chunk)

370

elapsed = time.time() - start_time

371

current_rate = total_bytes / elapsed if elapsed > 0 else 0

372

373

print(f"Downloaded: {total_bytes} bytes, Rate: {current_rate / 1024:.1f} KB/s")

374

375

end_time = time.time()

376

print(f"Total time: {end_time - start_time:.2f} seconds")

377

print(f"Average rate: {total_bytes / (end_time - start_time) / 1024:.1f} KB/s")

378

```

379

380

### Bandwidth Rate Tracking

381

382

```python

383

from s3transfer.bandwidth import BandwidthRateTracker

384

import time

385

import random

386

387

# Create rate tracker

388

rate_tracker = BandwidthRateTracker()

389

390

# Simulate transfer operations with varying rates

391

print("Simulating bandwidth consumption...")

392

393

for i in range(20):

394

# Simulate varying chunk sizes and transfer times

395

chunk_size = random.randint(1024, 64 * 1024) # 1KB to 64KB

396

transfer_time = random.uniform(0.1, 2.0) # 0.1 to 2.0 seconds

397

398

# Record the consumption

399

rate_tracker.record_consumption_rate(chunk_size, transfer_time)

400

401

# Get current and projected rates

402

current_rate = rate_tracker.current_rate

403

projected_rate = rate_tracker.get_projected_rate()

404

405

print(f"Chunk {i+1}: {chunk_size} bytes in {transfer_time:.2f}s")

406

print(f" Current rate: {current_rate / 1024:.1f} KB/s")

407

print(f" Projected rate: {projected_rate / 1024:.1f} KB/s")

408

409

time.sleep(0.1) # Brief pause between operations

410

```

411

412

### Custom Time Utilities

413

414

```python

415

from s3transfer.bandwidth import TimeUtils, LeakyBucket

416

import time

417

418

class CustomTimeUtils(TimeUtils):

419

"""Custom time utilities with logging."""

420

421

def time(self):

422

current_time = time.time()

423

print(f"Time check: {current_time}")

424

return current_time

425

426

def sleep(self, seconds):

427

print(f"Sleeping for {seconds} seconds...")

428

time.sleep(seconds)

429

print("Sleep completed")

430

431

# Use custom time utils with leaky bucket

432

custom_time = CustomTimeUtils()

433

leaky_bucket = LeakyBucket(1024 * 1024, time_utils=custom_time) # 1MB/s

434

435

# Time operations will now be logged

436

try:

437

request_token = leaky_bucket.RequestToken(1024, custom_time.time())

438

leaky_bucket.consume(1024, request_token)

439

print("Consumption successful")

440

except Exception as e:

441

print(f"Consumption failed: {e}")

442

```

443

444

### Advanced Scheduling

445

446

```python

447

from s3transfer.bandwidth import ConsumptionScheduler, LeakyBucket, RequestToken

448

import time

449

450

# Create scheduler with leaky bucket

451

max_rate = 1024 * 1024 # 1MB/s

452

leaky_bucket = LeakyBucket(max_rate)

453

scheduler = ConsumptionScheduler(leaky_bucket)

454

455

# Create multiple requests

456

requests = []

457

current_time = time.time()

458

459

for i in range(5):

460

token = RequestToken(512 * 1024, current_time + i * 0.1) # 512KB requests

461

requests.append(token)

462

463

print("Scheduling bandwidth requests...")

464

465

# Try to consume immediately, schedule if not possible

466

for i, token in enumerate(requests):

467

try:

468

leaky_bucket.consume(token.amount, token)

469

print(f"Request {i+1}: Immediate consumption successful")

470

except Exception as e:

471

# Schedule for later consumption

472

retry_time = current_time + 1.0 # Retry in 1 second

473

scheduler.schedule_consumption(token, retry_time)

474

print(f"Request {i+1}: Scheduled for later ({e})")

475

476

# Process scheduled requests

477

print("\nProcessing scheduled requests...")

478

time.sleep(1.5) # Wait for scheduled time

479

480

scheduler.process_scheduled_consumption()

481

print("Scheduled consumption processing completed")

482

```

483

484

### Error Handling and Recovery

485

486

```python

487

from s3transfer.bandwidth import (

488

BandwidthLimiter, LeakyBucket, RequestExceededException

489

)

490

import time

491

492

# Create bandwidth limiter with low rate for demonstration

493

max_rate = 1024 # Very low 1KB/s for quick demonstration

494

leaky_bucket = LeakyBucket(max_rate)

495

bandwidth_limiter = BandwidthLimiter(leaky_bucket)

496

497

class RetryableStream:

498

def __init__(self, data):

499

self.data = data

500

self.position = 0

501

502

def read(self, amount=None):

503

if amount is None:

504

amount = len(self.data) - self.position

505

end = min(self.position + amount, len(self.data))

506

result = self.data[self.position:end]

507

self.position = end

508

return result

509

510

# Create test scenario

511

test_data = b'x' * 10240 # 10KB test data

512

stream = RetryableStream(test_data)

513

coordinator = TransferCoordinator()

514

515

limited_stream = bandwidth_limiter.get_bandwith_limited_stream(stream, coordinator)

516

517

print("Testing bandwidth limiting with error handling...")

518

519

total_read = 0

520

retries = 0

521

max_retries = 5

522

523

while total_read < len(test_data) and retries < max_retries:

524

try:

525

# Try to read a large chunk (will likely hit rate limit)

526

chunk = limited_stream.read(2048) # 2KB chunk

527

528

if chunk:

529

total_read += len(chunk)

530

print(f"Successfully read {len(chunk)} bytes (total: {total_read})")

531

else:

532

break

533

534

except RequestExceededException as e:

535

retries += 1

536

print(f"Rate limit exceeded: {e.requested_amt} bytes")

537

print(f"Retry after: {e.retry_time}")

538

539

# Wait until retry time

540

sleep_time = e.retry_time - time.time()

541

if sleep_time > 0:

542

print(f"Waiting {sleep_time:.2f} seconds...")

543

time.sleep(sleep_time)

544

545

except Exception as e:

546

print(f"Unexpected error: {e}")

547

break

548

549

print(f"Transfer completed: {total_read}/{len(test_data)} bytes")

550

print(f"Retries: {retries}")

551

```

552

553

### Integration with TransferManager

554

555

```python

556

from s3transfer.manager import TransferManager, TransferConfig

557

from s3transfer.subscribers import BaseSubscriber

558

import boto3

559

import time

560

561

class BandwidthMonitorSubscriber(BaseSubscriber):

562

"""Subscriber that monitors bandwidth usage."""

563

564

def __init__(self):

565

self.start_time = None

566

self.total_bytes = 0

567

568

def on_queued(self, **kwargs):

569

self.start_time = time.time()

570

print("Transfer queued - monitoring bandwidth...")

571

572

def on_progress(self, bytes_transferred, **kwargs):

573

self.total_bytes += bytes_transferred

574

if self.start_time:

575

elapsed = time.time() - self.start_time

576

if elapsed > 0:

577

rate = self.total_bytes / elapsed

578

print(f"Current rate: {rate / 1024:.1f} KB/s")

579

580

def on_done(self, **kwargs):

581

if self.start_time:

582

elapsed = time.time() - self.start_time

583

avg_rate = self.total_bytes / elapsed if elapsed > 0 else 0

584

print(f"Transfer completed - Average rate: {avg_rate / 1024:.1f} KB/s")

585

586

# Configure transfer manager with bandwidth limiting

587

config = TransferConfig(

588

max_bandwidth=2 * 1024 * 1024, # 2MB/s limit

589

multipart_threshold=5 * 1024 * 1024, # 5MB threshold

590

multipart_chunksize=1 * 1024 * 1024 # 1MB chunks

591

)

592

593

client = boto3.client('s3')

594

transfer_manager = TransferManager(client, config)

595

596

try:

597

# Create bandwidth monitoring subscriber

598

bandwidth_monitor = BandwidthMonitorSubscriber()

599

600

# Upload with bandwidth monitoring

601

with open('/tmp/test_file.dat', 'rb') as f:

602

future = transfer_manager.upload(

603

f, 'my-bucket', 'test_file.dat',

604

subscribers=[bandwidth_monitor]

605

)

606

607

# Monitor progress

608

while not future.done():

609

time.sleep(1)

610

611

result = future.result()

612

print("Upload completed successfully!")

613

614

finally:

615

transfer_manager.shutdown()

616

```

617

618

## Configuration Guidelines

619

620

### Rate Selection

621

622

```python

623

# Conservative rates for shared networks

624

conservative_rate = 1 * 1024 * 1024 # 1MB/s

625

626

# Moderate rates for dedicated connections

627

moderate_rate = 10 * 1024 * 1024 # 10MB/s

628

629

# High rates for high-bandwidth connections

630

high_rate = 100 * 1024 * 1024 # 100MB/s

631

632

# Adaptive rate based on connection testing

633

def test_connection_speed():

634

# Implementation would test actual throughput

635

return 50 * 1024 * 1024 # 50MB/s example

636

637

adaptive_rate = test_connection_speed()

638

```

639

640

### Bucket Configuration

641

642

```python

643

from s3transfer.bandwidth import LeakyBucket

644

645

# Burst-tolerant configuration

646

burst_bucket = LeakyBucket(

647

max_rate=5 * 1024 * 1024, # 5MB/s sustained

648

# Additional parameters for burst handling would go here

649

)

650

651

# Strict rate limiting

652

strict_bucket = LeakyBucket(

653

max_rate=1 * 1024 * 1024, # 1MB/s strict limit

654

)

655

```

656

657

## Best Practices

658

659

### Bandwidth Management

660

661

1. **Monitor actual rates**: Use rate tracking to verify bandwidth limiting effectiveness

662

2. **Handle exceptions**: Catch and handle `RequestExceededException` appropriately

663

3. **Use appropriate rates**: Set realistic limits based on network capacity

664

4. **Enable selectively**: Enable bandwidth limiting only when needed

665

666

### Performance Optimization

667

668

1. **Balance chunk sizes**: Larger chunks reduce overhead but may impact responsiveness

669

2. **Consider latency**: Account for network latency in rate calculations

670

3. **Monitor resource usage**: Bandwidth limiting adds CPU overhead

671

4. **Test configurations**: Validate bandwidth settings with real workloads

672

673

### Error Recovery

674

675

1. **Implement retry logic**: Handle rate limit exceptions with appropriate delays

676

2. **Use exponential backoff**: Increase delays for repeated failures

677

3. **Set maximum retries**: Prevent infinite retry loops

678

4. **Log bandwidth events**: Monitor bandwidth limiting for debugging