or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bandwidth-management.mdconfiguration.mdcrt-support.mdexception-handling.mdfile-utilities.mdfutures-coordination.mdindex.mdlegacy-transfer.mdprocess-pool-downloads.mdsubscribers-callbacks.mdtransfer-manager.md

futures-coordination.mddocs/

0

# Future-based Coordination

1

2

Asynchronous transfer execution using futures, coordinators, and metadata tracking for monitoring transfer progress, handling completion, and coordinating complex multi-part operations.

3

4

## Capabilities

5

6

### TransferFuture

7

8

Future object representing a transfer request with methods for monitoring progress and retrieving results.

9

10

```python { .api }

11

class TransferFuture:

12

"""

13

Future representing a transfer request.

14

15

Provides methods to check completion status, retrieve results, and cancel operations.

16

"""

17

def done(self) -> bool:

18

"""

19

Check if the transfer is complete.

20

21

Returns:

22

bool: True if transfer is complete (success or failure), False otherwise

23

"""

24

25

def result(self):

26

"""

27

Get the transfer result, blocking until complete.

28

29

Returns:

30

Any: Transfer result (usually None for successful transfers)

31

32

Raises:

33

Exception: Any exception that occurred during transfer

34

TransferNotDoneError: If called before transfer completion

35

"""

36

37

def cancel(self):

38

"""

39

Cancel the transfer if possible.

40

41

Returns:

42

bool: True if cancellation was successful, False otherwise

43

"""

44

45

def set_exception(self, exception):

46

"""

47

Set an exception on the future.

48

49

Args:

50

exception: Exception to set on the future

51

"""

52

53

@property

54

def meta(self) -> 'TransferMeta':

55

"""

56

Transfer metadata object containing call arguments and status information.

57

58

Returns:

59

TransferMeta: Metadata object for this transfer

60

"""

61

```

62

63

### TransferMeta

64

65

Metadata container providing information about a transfer including call arguments, transfer ID, size, and custom context.

66

67

```python { .api }

68

class TransferMeta:

69

"""

70

Metadata about a TransferFuture containing call arguments and transfer information.

71

"""

72

@property

73

def call_args(self):

74

"""

75

The original call arguments used for the transfer.

76

77

Returns:

78

CallArgs: Object containing method arguments (bucket, key, etc.)

79

"""

80

81

@property

82

def transfer_id(self) -> str:

83

"""

84

Unique identifier for this transfer.

85

86

Returns:

87

str: Unique transfer ID

88

"""

89

90

@property

91

def size(self) -> Optional[int]:

92

"""

93

Total size of the transfer in bytes (if known).

94

95

Returns:

96

int or None: Transfer size in bytes, None if unknown

97

"""

98

99

@property

100

def user_context(self) -> Dict[str, Any]:

101

"""

102

User-defined context dictionary for storing custom data.

103

104

Returns:

105

dict: User context dictionary

106

"""

107

108

@property

109

def etag(self) -> Optional[str]:

110

"""

111

ETag of the S3 object (if available).

112

113

Returns:

114

str or None: Object ETag, None if not available

115

"""

116

117

def provide_transfer_size(self, size: int):

118

"""

119

Provide the total transfer size for progress tracking.

120

121

Args:

122

size (int): Total size in bytes

123

"""

124

125

def provide_object_etag(self, etag: str):

126

"""

127

Provide the object ETag.

128

129

Args:

130

etag (str): Object ETag value

131

"""

132

```

133

134

### TransferCoordinator

135

136

Central coordinator managing transfer execution, associated futures, and cleanup operations.

137

138

```python { .api }

139

class TransferCoordinator:

140

"""

141

Coordinates transfer execution and manages associated futures.

142

143

Handles task submission, result/exception setting, cancellation, and cleanup operations.

144

145

Args:

146

transfer_id: Optional transfer identifier (default: None)

147

"""

148

def __init__(self, transfer_id=None): ...

149

def set_result(self, result):

150

"""

151

Set the transfer result.

152

153

Args:

154

result: Result value for the transfer

155

"""

156

157

def set_exception(self, exception, override=False):

158

"""

159

Set an exception for the transfer.

160

161

Args:

162

exception: Exception that occurred during transfer

163

override (bool): If True, override any existing state (default: False)

164

"""

165

166

def result(self):

167

"""

168

Get the transfer result, blocking until complete.

169

170

Returns:

171

Any: Transfer result

172

173

Raises:

174

Exception: Any exception that occurred during transfer

175

"""

176

177

def cancel(self, msg: str = '', exc_type=CancelledError):

178

"""

179

Cancel the transfer.

180

181

Args:

182

msg (str): Cancellation message (default: '')

183

exc_type: Type of exception to set for cancellation (default: CancelledError)

184

"""

185

186

def submit(self, executor, task, tag=None):

187

"""

188

Submit a task for execution.

189

190

Args:

191

executor: Executor to submit task to

192

task: Callable task to execute

193

tag: TaskTag to associate with the submitted task (optional)

194

195

Returns:

196

concurrent.futures.Future: Future object for the submitted task

197

"""

198

199

def done(self) -> bool:

200

"""

201

Check if the transfer is complete.

202

203

Returns:

204

bool: True if complete, False otherwise

205

"""

206

207

def add_done_callback(self, function, *args, **kwargs):

208

"""

209

Add a callback to be called when transfer completes.

210

211

Args:

212

function: Callback function to call on completion

213

*args: Additional positional arguments to pass to callback

214

**kwargs: Additional keyword arguments to pass to callback

215

"""

216

217

def add_failure_cleanup(self, function, *args, **kwargs):

218

"""

219

Add a cleanup function to be called if transfer fails.

220

221

Args:

222

function: Function to call for cleanup on failure

223

*args: Additional positional arguments to pass to cleanup function

224

**kwargs: Additional keyword arguments to pass to cleanup function

225

"""

226

227

def announce_done(self):

228

"""

229

Announce that the transfer is complete and trigger callbacks.

230

"""

231

232

def set_status_to_queued(self):

233

"""

234

Set the TransferFuture's status to queued.

235

"""

236

237

def set_status_to_running(self):

238

"""

239

Set the TransferFuture's status to running.

240

"""

241

242

def add_associated_future(self, future):

243

"""

244

Add a future to be associated with the TransferFuture.

245

246

Args:

247

future: Future object to associate with this coordinator

248

"""

249

250

def remove_associated_future(self, future):

251

"""

252

Remove a future's association to the TransferFuture.

253

254

Args:

255

future: Future object to disassociate from this coordinator

256

"""

257

258

@property

259

def exception(self):

260

"""

261

Exception that occurred during transfer (if any).

262

263

Returns:

264

Exception or None: Transfer exception, None if no exception

265

"""

266

267

@property

268

def associated_futures(self) -> Set:

269

"""

270

Set of futures associated with this coordinator.

271

272

Returns:

273

set: Set of associated Future objects

274

"""

275

276

@property

277

def failure_cleanups(self) -> List:

278

"""

279

List of cleanup functions to call on failure.

280

281

Returns:

282

list: List of cleanup functions

283

"""

284

285

@property

286

def status(self) -> str:

287

"""

288

Current status of the transfer with detailed state information.

289

290

Returns:

291

str: Status string with specific states:

292

- 'not-started': Has yet to start, can be cancelled immediately

293

- 'queued': SubmissionTask is about to submit tasks

294

- 'running': Is in progress (SubmissionTask executing)

295

- 'cancelled': Was cancelled

296

- 'failed': An exception other than CancelledError was thrown

297

- 'success': No exceptions were thrown and is done

298

"""

299

```

300

301

### BoundedExecutor

302

303

Executor with bounded task submission queue to prevent unlimited memory growth during high-volume operations.

304

305

```python { .api }

306

class BoundedExecutor:

307

"""

308

Executor with bounded task submission queue.

309

310

Prevents unlimited memory growth by blocking task submission when queue is full.

311

"""

312

def __init__(self, executor, max_size: int, tag_semaphores=None): ...

313

314

def submit(self, fn, *args, **kwargs):

315

"""

316

Submit a task for execution, blocking if queue is full.

317

318

Args:

319

fn: Function to execute

320

*args: Positional arguments for function

321

**kwargs: Keyword arguments for function

322

323

Returns:

324

Future: Future object for the submitted task

325

"""

326

327

def shutdown(self, wait: bool = True):

328

"""

329

Shutdown the executor.

330

331

Args:

332

wait (bool): Whether to wait for completion

333

"""

334

```

335

336

### ExecutorFuture

337

338

Wrapper around concurrent.futures.Future providing consistent interface for transfer operations.

339

340

```python { .api }

341

class ExecutorFuture:

342

"""

343

Wrapper around concurrent.futures.Future with additional functionality.

344

"""

345

def __init__(self, future): ...

346

347

def result(self):

348

"""

349

Get result from the wrapped future.

350

351

Returns:

352

Any: Future result

353

"""

354

355

def add_done_callback(self, fn):

356

"""

357

Add callback to be called when future completes.

358

359

Args:

360

fn: Callback function

361

"""

362

363

def done(self) -> bool:

364

"""

365

Check if future is complete.

366

367

Returns:

368

bool: True if complete, False otherwise

369

"""

370

```

371

372

## Usage Examples

373

374

### Basic Future Handling

375

376

```python

377

from s3transfer.manager import TransferManager

378

import boto3

379

380

client = boto3.client('s3')

381

transfer_manager = TransferManager(client)

382

383

try:

384

# Start transfer and get future

385

with open('/tmp/file.txt', 'rb') as f:

386

future = transfer_manager.upload(f, 'my-bucket', 'file.txt')

387

388

# Check if complete (non-blocking)

389

if future.done():

390

print("Transfer already complete!")

391

else:

392

print("Transfer in progress...")

393

394

# Wait for completion and get result

395

result = future.result() # Blocks until complete

396

print("Transfer completed successfully!")

397

398

# Access metadata

399

print(f"Transfer ID: {future.meta.transfer_id}")

400

print(f"Bucket: {future.meta.call_args.bucket}")

401

print(f"Key: {future.meta.call_args.key}")

402

403

finally:

404

transfer_manager.shutdown()

405

```

406

407

### Progress Tracking with Size Information

408

409

```python

410

import time

411

from s3transfer.manager import TransferManager

412

413

client = boto3.client('s3')

414

transfer_manager = TransferManager(client)

415

416

try:

417

filename = '/tmp/large_file.dat'

418

file_size = os.path.getsize(filename)

419

420

with open(filename, 'rb') as f:

421

future = transfer_manager.upload(f, 'my-bucket', 'large_file.dat')

422

423

# Provide size information for progress tracking

424

future.meta.provide_transfer_size(file_size)

425

426

# Monitor progress

427

while not future.done():

428

print(f"Transfer ID: {future.meta.transfer_id}")

429

print(f"Size: {future.meta.size} bytes")

430

print(f"Status: In progress...")

431

time.sleep(1)

432

433

# Get final result

434

result = future.result()

435

print("Upload completed!")

436

437

# Check if ETag is available

438

if future.meta.etag:

439

print(f"Object ETag: {future.meta.etag}")

440

441

finally:

442

transfer_manager.shutdown()

443

```

444

445

### Multiple Concurrent Operations

446

447

```python

448

from s3transfer.manager import TransferManager

449

import concurrent.futures

450

451

client = boto3.client('s3')

452

transfer_manager = TransferManager(client)

453

454

try:

455

# Start multiple transfers

456

upload_futures = []

457

files = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']

458

459

for filename in files:

460

with open(filename, 'rb') as f:

461

future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))

462

upload_futures.append(future)

463

464

# Wait for all to complete

465

print(f"Started {len(upload_futures)} uploads...")

466

467

completed = 0

468

while completed < len(upload_futures):

469

for i, future in enumerate(upload_futures):

470

if future.done() and i not in processed:

471

try:

472

result = future.result()

473

print(f"Completed: {future.meta.call_args.key}")

474

completed += 1

475

except Exception as e:

476

print(f"Failed: {future.meta.call_args.key} - {e}")

477

completed += 1

478

time.sleep(0.1)

479

480

print("All transfers completed!")

481

482

finally:

483

transfer_manager.shutdown()

484

```

485

486

### Cancellation Handling

487

488

```python

489

import time

490

import threading

491

from s3transfer.manager import TransferManager

492

493

client = boto3.client('s3')

494

transfer_manager = TransferManager(client)

495

496

try:

497

# Start a large transfer

498

with open('/tmp/very_large_file.dat', 'rb') as f:

499

future = transfer_manager.upload(f, 'my-bucket', 'very_large_file.dat')

500

501

# Cancel after 5 seconds (example)

502

def cancel_transfer():

503

time.sleep(5)

504

print("Cancelling transfer...")

505

success = future.cancel()

506

print(f"Cancellation {'successful' if success else 'failed'}")

507

508

cancel_thread = threading.Thread(target=cancel_transfer)

509

cancel_thread.start()

510

511

try:

512

# This will raise an exception if cancelled

513

result = future.result()

514

print("Transfer completed successfully!")

515

except Exception as e:

516

print(f"Transfer failed or was cancelled: {e}")

517

518

cancel_thread.join()

519

520

finally:

521

transfer_manager.shutdown()

522

```

523

524

### Custom Context and Metadata

525

526

```python

527

from s3transfer.manager import TransferManager

528

529

client = boto3.client('s3')

530

transfer_manager = TransferManager(client)

531

532

try:

533

with open('/tmp/document.pdf', 'rb') as f:

534

future = transfer_manager.upload(f, 'my-bucket', 'documents/document.pdf')

535

536

# Add custom context information

537

future.meta.user_context['upload_time'] = time.time()

538

future.meta.user_context['user_id'] = 'user123'

539

future.meta.user_context['department'] = 'engineering'

540

541

# Provide additional metadata

542

file_size = os.path.getsize('/tmp/document.pdf')

543

future.meta.provide_transfer_size(file_size)

544

545

# Wait for completion

546

result = future.result()

547

548

# Access custom context

549

upload_time = future.meta.user_context['upload_time']

550

user_id = future.meta.user_context['user_id']

551

552

print(f"Upload completed for user {user_id} at {upload_time}")

553

print(f"File size: {future.meta.size} bytes")

554

555

finally:

556

transfer_manager.shutdown()

557

```

558

559

### Error Handling with Futures

560

561

```python

562

from s3transfer.manager import TransferManager

563

from s3transfer.exceptions import S3UploadFailedError, TransferNotDoneError

564

565

client = boto3.client('s3')

566

transfer_manager = TransferManager(client)

567

568

try:

569

# Start multiple transfers with error handling

570

futures = []

571

files = ['/tmp/file1.txt', '/tmp/nonexistent.txt', '/tmp/file3.txt']

572

573

for filename in files:

574

try:

575

with open(filename, 'rb') as f:

576

future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))

577

futures.append((future, filename))

578

except FileNotFoundError:

579

print(f"File not found: {filename}")

580

continue

581

582

# Process results

583

for future, filename in futures:

584

try:

585

result = future.result()

586

print(f"✓ Successfully uploaded: {filename}")

587

588

except S3UploadFailedError as e:

589

print(f"✗ Upload failed for {filename}: {e}")

590

591

except TransferNotDoneError as e:

592

print(f"✗ Transfer not complete for {filename}: {e}")

593

594

except Exception as e:

595

print(f"✗ Unexpected error for {filename}: {e}")

596

597

finally:

598

transfer_manager.shutdown()

599

```

600

601

### Working with TransferCoordinator

602

603

```python

604

from s3transfer.manager import TransferManager

605

from s3transfer.futures import TransferCoordinator

606

607

# Example of accessing the underlying coordinator (advanced usage)

608

client = boto3.client('s3')

609

transfer_manager = TransferManager(client)

610

611

try:

612

with open('/tmp/file.txt', 'rb') as f:

613

future = transfer_manager.upload(f, 'my-bucket', 'file.txt')

614

615

# Access the underlying coordinator (advanced usage)

616

coordinator = future._coordinator # Note: private attribute

617

618

# Add custom done callback

619

def on_transfer_complete():

620

print(f"Transfer {future.meta.transfer_id} completed!")

621

print(f"Final status: {coordinator.status}")

622

623

coordinator.add_done_callback(on_transfer_complete)

624

625

# Add failure cleanup

626

def cleanup_on_failure():

627

print("Cleaning up after transfer failure...")

628

629

coordinator.add_failure_cleanup(cleanup_on_failure)

630

631

# Wait for completion

632

result = future.result()

633

634

finally:

635

transfer_manager.shutdown()

636

```

637

638

## Future States and Lifecycle

639

640

### Future States

641

642

1. **Created**: Future object created, transfer queued

643

2. **Running**: Transfer is actively executing

644

3. **Completed**: Transfer finished successfully

645

4. **Failed**: Transfer failed with an exception

646

5. **Cancelled**: Transfer was cancelled before completion

647

648

### State Transitions

649

650

```python

651

# Check current state

652

if not future.done():

653

print("Transfer is running or queued")

654

else:

655

try:

656

result = future.result()

657

print("Transfer completed successfully")

658

except Exception as e:

659

print(f"Transfer failed: {e}")

660

```

661

662

## Best Practices

663

664

### Future Management

665

666

1. **Always call result()**: Even if you don't need the return value, call `future.result()` to ensure exceptions are raised

667

2. **Handle exceptions**: Wrap `future.result()` in try/catch blocks

668

3. **Don't ignore futures**: Keep references to futures until completion

669

4. **Check done() for polling**: Use `future.done()` for non-blocking status checks

670

671

### Resource Management

672

673

1. **Limit concurrent futures**: Don't create unlimited futures without waiting for completion

674

2. **Clean up on failure**: Use failure cleanup functions for resource cleanup

675

3. **Cancel when appropriate**: Cancel futures during shutdown or error conditions

676

4. **Monitor memory usage**: Large numbers of futures can consume significant memory

677

678

### Error Handling

679

680

1. **Catch specific exceptions**: Handle `S3UploadFailedError`, `TransferNotDoneError`, etc. specifically

681

2. **Use coordinator callbacks**: Add failure cleanup functions for automatic resource management

682

3. **Log transfer IDs**: Include transfer IDs in error messages for debugging

683

4. **Implement retry logic**: Use futures with retry logic for resilient applications