0
# Future-based Coordination
1
2
Asynchronous transfer execution using futures, coordinators, and metadata tracking for monitoring transfer progress, handling completion, and coordinating complex multi-part operations.
3
4
## Capabilities
5
6
### TransferFuture
7
8
Future object representing a transfer request with methods for monitoring progress and retrieving results.
9
10
```python { .api }
11
class TransferFuture:
12
"""
13
Future representing a transfer request.
14
15
Provides methods to check completion status, retrieve results, and cancel operations.
16
"""
17
def done(self) -> bool:
18
"""
19
Check if the transfer is complete.
20
21
Returns:
22
bool: True if transfer is complete (success or failure), False otherwise
23
"""
24
25
def result(self):
26
"""
27
Get the transfer result, blocking until complete.
28
29
Returns:
30
Any: Transfer result (usually None for successful transfers)
31
32
Raises:
33
Exception: Any exception that occurred during transfer
34
TransferNotDoneError: If called before transfer completion
35
"""
36
37
def cancel(self):
38
"""
39
Cancel the transfer if possible.
40
41
Returns:
42
bool: True if cancellation was successful, False otherwise
43
"""
44
45
def set_exception(self, exception):
46
"""
47
Set an exception on the future.
48
49
Args:
50
exception: Exception to set on the future
51
"""
52
53
@property
54
def meta(self) -> 'TransferMeta':
55
"""
56
Transfer metadata object containing call arguments and status information.
57
58
Returns:
59
TransferMeta: Metadata object for this transfer
60
"""
61
```
62
63
### TransferMeta
64
65
Metadata container providing information about a transfer including call arguments, transfer ID, size, and custom context.
66
67
```python { .api }
68
class TransferMeta:
69
"""
70
Metadata about a TransferFuture containing call arguments and transfer information.
71
"""
72
@property
73
def call_args(self):
74
"""
75
The original call arguments used for the transfer.
76
77
Returns:
78
CallArgs: Object containing method arguments (bucket, key, etc.)
79
"""
80
81
@property
82
def transfer_id(self) -> str:
83
"""
84
Unique identifier for this transfer.
85
86
Returns:
87
str: Unique transfer ID
88
"""
89
90
@property
91
def size(self) -> Optional[int]:
92
"""
93
Total size of the transfer in bytes (if known).
94
95
Returns:
96
int or None: Transfer size in bytes, None if unknown
97
"""
98
99
@property
100
def user_context(self) -> Dict[str, Any]:
101
"""
102
User-defined context dictionary for storing custom data.
103
104
Returns:
105
dict: User context dictionary
106
"""
107
108
@property
109
def etag(self) -> Optional[str]:
110
"""
111
ETag of the S3 object (if available).
112
113
Returns:
114
str or None: Object ETag, None if not available
115
"""
116
117
def provide_transfer_size(self, size: int):
118
"""
119
Provide the total transfer size for progress tracking.
120
121
Args:
122
size (int): Total size in bytes
123
"""
124
125
def provide_object_etag(self, etag: str):
126
"""
127
Provide the object ETag.
128
129
Args:
130
etag (str): Object ETag value
131
"""
132
```
133
134
### TransferCoordinator
135
136
Central coordinator managing transfer execution, associated futures, and cleanup operations.
137
138
```python { .api }
139
class TransferCoordinator:
140
"""
141
Coordinates transfer execution and manages associated futures.
142
143
Handles task submission, result/exception setting, cancellation, and cleanup operations.
144
145
Args:
146
transfer_id: Optional transfer identifier (default: None)
147
"""
148
def __init__(self, transfer_id=None): ...
149
def set_result(self, result):
150
"""
151
Set the transfer result.
152
153
Args:
154
result: Result value for the transfer
155
"""
156
157
def set_exception(self, exception, override=False):
158
"""
159
Set an exception for the transfer.
160
161
Args:
162
exception: Exception that occurred during transfer
163
override (bool): If True, override any existing state (default: False)
164
"""
165
166
def result(self):
167
"""
168
Get the transfer result, blocking until complete.
169
170
Returns:
171
Any: Transfer result
172
173
Raises:
174
Exception: Any exception that occurred during transfer
175
"""
176
177
def cancel(self, msg: str = '', exc_type=CancelledError):
178
"""
179
Cancel the transfer.
180
181
Args:
182
msg (str): Cancellation message (default: '')
183
exc_type: Type of exception to set for cancellation (default: CancelledError)
184
"""
185
186
def submit(self, executor, task, tag=None):
187
"""
188
Submit a task for execution.
189
190
Args:
191
executor: Executor to submit task to
192
task: Callable task to execute
193
tag: TaskTag to associate with the submitted task (optional)
194
195
Returns:
196
concurrent.futures.Future: Future object for the submitted task
197
"""
198
199
def done(self) -> bool:
200
"""
201
Check if the transfer is complete.
202
203
Returns:
204
bool: True if complete, False otherwise
205
"""
206
207
def add_done_callback(self, function, *args, **kwargs):
208
"""
209
Add a callback to be called when transfer completes.
210
211
Args:
212
function: Callback function to call on completion
213
*args: Additional positional arguments to pass to callback
214
**kwargs: Additional keyword arguments to pass to callback
215
"""
216
217
def add_failure_cleanup(self, function, *args, **kwargs):
218
"""
219
Add a cleanup function to be called if transfer fails.
220
221
Args:
222
function: Function to call for cleanup on failure
223
*args: Additional positional arguments to pass to cleanup function
224
**kwargs: Additional keyword arguments to pass to cleanup function
225
"""
226
227
def announce_done(self):
228
"""
229
Announce that the transfer is complete and trigger callbacks.
230
"""
231
232
def set_status_to_queued(self):
233
"""
234
Set the TransferFuture's status to queued.
235
"""
236
237
def set_status_to_running(self):
238
"""
239
Set the TransferFuture's status to running.
240
"""
241
242
def add_associated_future(self, future):
243
"""
244
Add a future to be associated with the TransferFuture.
245
246
Args:
247
future: Future object to associate with this coordinator
248
"""
249
250
def remove_associated_future(self, future):
251
"""
252
Remove a future's association to the TransferFuture.
253
254
Args:
255
future: Future object to disassociate from this coordinator
256
"""
257
258
@property
259
def exception(self):
260
"""
261
Exception that occurred during transfer (if any).
262
263
Returns:
264
Exception or None: Transfer exception, None if no exception
265
"""
266
267
@property
268
def associated_futures(self) -> Set:
269
"""
270
Set of futures associated with this coordinator.
271
272
Returns:
273
set: Set of associated Future objects
274
"""
275
276
@property
277
def failure_cleanups(self) -> List:
278
"""
279
List of cleanup functions to call on failure.
280
281
Returns:
282
list: List of cleanup functions
283
"""
284
285
@property
286
def status(self) -> str:
287
"""
288
Current status of the transfer with detailed state information.
289
290
Returns:
291
str: Status string with specific states:
292
- 'not-started': Has yet to start, can be cancelled immediately
293
- 'queued': SubmissionTask is about to submit tasks
294
- 'running': Is in progress (SubmissionTask executing)
295
- 'cancelled': Was cancelled
296
- 'failed': An exception other than CancelledError was thrown
297
- 'success': No exceptions were thrown and is done
298
"""
299
```
300
301
### BoundedExecutor
302
303
Executor with bounded task submission queue to prevent unlimited memory growth during high-volume operations.
304
305
```python { .api }
306
class BoundedExecutor:
307
"""
308
Executor with bounded task submission queue.
309
310
Prevents unlimited memory growth by blocking task submission when queue is full.
311
"""
312
def __init__(self, executor, max_size: int, tag_semaphores=None): ...
313
314
def submit(self, fn, *args, **kwargs):
315
"""
316
Submit a task for execution, blocking if queue is full.
317
318
Args:
319
fn: Function to execute
320
*args: Positional arguments for function
321
**kwargs: Keyword arguments for function
322
323
Returns:
324
Future: Future object for the submitted task
325
"""
326
327
def shutdown(self, wait: bool = True):
328
"""
329
Shutdown the executor.
330
331
Args:
332
wait (bool): Whether to wait for completion
333
"""
334
```
335
336
### ExecutorFuture
337
338
Wrapper around concurrent.futures.Future providing consistent interface for transfer operations.
339
340
```python { .api }
341
class ExecutorFuture:
342
"""
343
Wrapper around concurrent.futures.Future with additional functionality.
344
"""
345
def __init__(self, future): ...
346
347
def result(self):
348
"""
349
Get result from the wrapped future.
350
351
Returns:
352
Any: Future result
353
"""
354
355
def add_done_callback(self, fn):
356
"""
357
Add callback to be called when future completes.
358
359
Args:
360
fn: Callback function
361
"""
362
363
def done(self) -> bool:
364
"""
365
Check if future is complete.
366
367
Returns:
368
bool: True if complete, False otherwise
369
"""
370
```
371
372
## Usage Examples
373
374
### Basic Future Handling
375
376
```python
377
from s3transfer.manager import TransferManager
378
import boto3
379
380
client = boto3.client('s3')
381
transfer_manager = TransferManager(client)
382
383
try:
384
# Start transfer and get future
385
with open('/tmp/file.txt', 'rb') as f:
386
future = transfer_manager.upload(f, 'my-bucket', 'file.txt')
387
388
# Check if complete (non-blocking)
389
if future.done():
390
print("Transfer already complete!")
391
else:
392
print("Transfer in progress...")
393
394
# Wait for completion and get result
395
result = future.result() # Blocks until complete
396
print("Transfer completed successfully!")
397
398
# Access metadata
399
print(f"Transfer ID: {future.meta.transfer_id}")
400
print(f"Bucket: {future.meta.call_args.bucket}")
401
print(f"Key: {future.meta.call_args.key}")
402
403
finally:
404
transfer_manager.shutdown()
405
```
406
407
### Progress Tracking with Size Information
408
409
```python
410
import time
411
from s3transfer.manager import TransferManager
412
413
client = boto3.client('s3')
414
transfer_manager = TransferManager(client)
415
416
try:
417
filename = '/tmp/large_file.dat'
418
file_size = os.path.getsize(filename)
419
420
with open(filename, 'rb') as f:
421
future = transfer_manager.upload(f, 'my-bucket', 'large_file.dat')
422
423
# Provide size information for progress tracking
424
future.meta.provide_transfer_size(file_size)
425
426
# Monitor progress
427
while not future.done():
428
print(f"Transfer ID: {future.meta.transfer_id}")
429
print(f"Size: {future.meta.size} bytes")
430
print(f"Status: In progress...")
431
time.sleep(1)
432
433
# Get final result
434
result = future.result()
435
print("Upload completed!")
436
437
# Check if ETag is available
438
if future.meta.etag:
439
print(f"Object ETag: {future.meta.etag}")
440
441
finally:
442
transfer_manager.shutdown()
443
```
444
445
### Multiple Concurrent Operations
446
447
```python
448
from s3transfer.manager import TransferManager
449
import concurrent.futures
450
451
client = boto3.client('s3')
452
transfer_manager = TransferManager(client)
453
454
try:
455
# Start multiple transfers
456
upload_futures = []
457
files = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
458
459
for filename in files:
460
with open(filename, 'rb') as f:
461
future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
462
upload_futures.append(future)
463
464
# Wait for all to complete
465
print(f"Started {len(upload_futures)} uploads...")
466
467
completed = 0
468
while completed < len(upload_futures):
469
for i, future in enumerate(upload_futures):
470
if future.done() and i not in processed:
471
try:
472
result = future.result()
473
print(f"Completed: {future.meta.call_args.key}")
474
completed += 1
475
except Exception as e:
476
print(f"Failed: {future.meta.call_args.key} - {e}")
477
completed += 1
478
time.sleep(0.1)
479
480
print("All transfers completed!")
481
482
finally:
483
transfer_manager.shutdown()
484
```
485
486
### Cancellation Handling
487
488
```python
489
import time
490
import threading
491
from s3transfer.manager import TransferManager
492
493
client = boto3.client('s3')
494
transfer_manager = TransferManager(client)
495
496
try:
497
# Start a large transfer
498
with open('/tmp/very_large_file.dat', 'rb') as f:
499
future = transfer_manager.upload(f, 'my-bucket', 'very_large_file.dat')
500
501
# Cancel after 5 seconds (example)
502
def cancel_transfer():
503
time.sleep(5)
504
print("Cancelling transfer...")
505
success = future.cancel()
506
print(f"Cancellation {'successful' if success else 'failed'}")
507
508
cancel_thread = threading.Thread(target=cancel_transfer)
509
cancel_thread.start()
510
511
try:
512
# This will raise an exception if cancelled
513
result = future.result()
514
print("Transfer completed successfully!")
515
except Exception as e:
516
print(f"Transfer failed or was cancelled: {e}")
517
518
cancel_thread.join()
519
520
finally:
521
transfer_manager.shutdown()
522
```
523
524
### Custom Context and Metadata
525
526
```python
527
from s3transfer.manager import TransferManager
528
529
client = boto3.client('s3')
530
transfer_manager = TransferManager(client)
531
532
try:
533
with open('/tmp/document.pdf', 'rb') as f:
534
future = transfer_manager.upload(f, 'my-bucket', 'documents/document.pdf')
535
536
# Add custom context information
537
future.meta.user_context['upload_time'] = time.time()
538
future.meta.user_context['user_id'] = 'user123'
539
future.meta.user_context['department'] = 'engineering'
540
541
# Provide additional metadata
542
file_size = os.path.getsize('/tmp/document.pdf')
543
future.meta.provide_transfer_size(file_size)
544
545
# Wait for completion
546
result = future.result()
547
548
# Access custom context
549
upload_time = future.meta.user_context['upload_time']
550
user_id = future.meta.user_context['user_id']
551
552
print(f"Upload completed for user {user_id} at {upload_time}")
553
print(f"File size: {future.meta.size} bytes")
554
555
finally:
556
transfer_manager.shutdown()
557
```
558
559
### Error Handling with Futures
560
561
```python
562
from s3transfer.manager import TransferManager
563
from s3transfer.exceptions import S3UploadFailedError, TransferNotDoneError
564
565
client = boto3.client('s3')
566
transfer_manager = TransferManager(client)
567
568
try:
569
# Start multiple transfers with error handling
570
futures = []
571
files = ['/tmp/file1.txt', '/tmp/nonexistent.txt', '/tmp/file3.txt']
572
573
for filename in files:
574
try:
575
with open(filename, 'rb') as f:
576
future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
577
futures.append((future, filename))
578
except FileNotFoundError:
579
print(f"File not found: {filename}")
580
continue
581
582
# Process results
583
for future, filename in futures:
584
try:
585
result = future.result()
586
print(f"✓ Successfully uploaded: {filename}")
587
588
except S3UploadFailedError as e:
589
print(f"✗ Upload failed for {filename}: {e}")
590
591
except TransferNotDoneError as e:
592
print(f"✗ Transfer not complete for {filename}: {e}")
593
594
except Exception as e:
595
print(f"✗ Unexpected error for {filename}: {e}")
596
597
finally:
598
transfer_manager.shutdown()
599
```
600
601
### Working with TransferCoordinator
602
603
```python
604
from s3transfer.manager import TransferManager
605
from s3transfer.futures import TransferCoordinator
606
607
# Example of accessing the underlying coordinator (advanced usage)
608
client = boto3.client('s3')
609
transfer_manager = TransferManager(client)
610
611
try:
612
with open('/tmp/file.txt', 'rb') as f:
613
future = transfer_manager.upload(f, 'my-bucket', 'file.txt')
614
615
# Access the underlying coordinator (advanced usage)
616
coordinator = future._coordinator # Note: private attribute
617
618
# Add custom done callback
619
def on_transfer_complete():
620
print(f"Transfer {future.meta.transfer_id} completed!")
621
print(f"Final status: {coordinator.status}")
622
623
coordinator.add_done_callback(on_transfer_complete)
624
625
# Add failure cleanup
626
def cleanup_on_failure():
627
print("Cleaning up after transfer failure...")
628
629
coordinator.add_failure_cleanup(cleanup_on_failure)
630
631
# Wait for completion
632
result = future.result()
633
634
finally:
635
transfer_manager.shutdown()
636
```
637
638
## Future States and Lifecycle
639
640
### Future States
641
642
1. **Created**: Future object created, transfer queued
643
2. **Running**: Transfer is actively executing
644
3. **Completed**: Transfer finished successfully
645
4. **Failed**: Transfer failed with an exception
646
5. **Cancelled**: Transfer was cancelled before completion
647
648
### State Transitions
649
650
```python
651
# Check current state
652
if not future.done():
653
print("Transfer is running or queued")
654
else:
655
try:
656
result = future.result()
657
print("Transfer completed successfully")
658
except Exception as e:
659
print(f"Transfer failed: {e}")
660
```
661
662
## Best Practices
663
664
### Future Management
665
666
1. **Always call result()**: Even if you don't need the return value, call `future.result()` to ensure exceptions are raised
667
2. **Handle exceptions**: Wrap `future.result()` in try/catch blocks
668
3. **Don't ignore futures**: Keep references to futures until completion
669
4. **Check done() for polling**: Use `future.done()` for non-blocking status checks
670
671
### Resource Management
672
673
1. **Limit concurrent futures**: Don't create unlimited futures without waiting for completion
674
2. **Clean up on failure**: Use failure cleanup functions for resource cleanup
675
3. **Cancel when appropriate**: Cancel futures during shutdown or error conditions
676
4. **Monitor memory usage**: Large numbers of futures can consume significant memory
677
678
### Error Handling
679
680
1. **Catch specific exceptions**: Handle `S3UploadFailedError`, `TransferNotDoneError`, etc. specifically
681
2. **Use coordinator callbacks**: Add failure cleanup functions for automatic resource management
682
3. **Log transfer IDs**: Include transfer IDs in error messages for debugging
683
4. **Implement retry logic**: Use futures with retry logic for resilient applications