0
# Modern Transfer Manager
1
2
The TransferManager class is the recommended interface for S3 operations, providing enhanced capabilities including upload/download/copy/delete operations, better resource management, future-based asynchronous execution, and comprehensive configuration options.
3
4
## Capabilities
5
6
### TransferManager Class
7
8
The main transfer management class supporting all S3 operations with asynchronous execution and comprehensive resource management.
9
10
```python { .api }
11
class TransferManager:
12
"""
13
Modern S3 transfer manager with enhanced capabilities.
14
15
Args:
16
client: boto3 S3 client instance
17
config: TransferConfig for controlling transfer behavior
18
osutil: OSUtils instance for file operations
19
executor_cls: Executor class for managing threads/processes
20
"""
21
def __init__(self, client, config=None, osutil=None, executor_cls=None): ...
22
23
def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None) -> TransferFuture:
24
"""
25
Upload a file-like object to S3.
26
27
Args:
28
fileobj: File-like object to upload (must support read())
29
bucket (str): S3 bucket name
30
key (str): S3 object key/name
31
extra_args (dict, optional): Additional S3 operation arguments
32
subscribers (list, optional): List of subscriber objects for events
33
34
Returns:
35
TransferFuture: Future object for tracking transfer progress
36
37
Raises:
38
ValueError: If extra_args contains invalid keys
39
"""
40
41
def download(self, bucket, key, fileobj, extra_args=None, subscribers=None) -> TransferFuture:
42
"""
43
Download an S3 object to a file-like object.
44
45
Args:
46
bucket (str): S3 bucket name
47
key (str): S3 object key/name
48
fileobj: File-like object to write to (must support write())
49
extra_args (dict, optional): Additional S3 operation arguments
50
subscribers (list, optional): List of subscriber objects for events
51
52
Returns:
53
TransferFuture: Future object for tracking transfer progress
54
55
Raises:
56
ValueError: If extra_args contains invalid keys
57
"""
58
59
def copy(self, copy_source, bucket, key, extra_args=None, subscribers=None, source_client=None) -> TransferFuture:
60
"""
61
Copy an S3 object to another location.
62
63
Args:
64
copy_source (dict): Source object specification {'Bucket': str, 'Key': str, 'VersionId': str}
65
bucket (str): Destination bucket name
66
key (str): Destination object key/name
67
extra_args (dict, optional): Additional S3 operation arguments
68
subscribers (list, optional): List of subscriber objects for events
69
source_client: Separate S3 client for source operations
70
71
Returns:
72
TransferFuture: Future object for tracking transfer progress
73
"""
74
75
def delete(self, bucket, key, extra_args=None, subscribers=None) -> TransferFuture:
76
"""
77
Delete an S3 object.
78
79
Args:
80
bucket (str): S3 bucket name
81
key (str): S3 object key/name
82
extra_args (dict, optional): Additional S3 operation arguments
83
subscribers (list, optional): List of subscriber objects for events
84
85
Returns:
86
TransferFuture: Future object for tracking transfer progress
87
"""
88
89
def shutdown(self, cancel=False, cancel_msg=''):
90
"""
91
Shutdown the transfer manager.
92
93
Args:
94
cancel (bool): Whether to cancel ongoing transfers
95
cancel_msg (str): Message to include with cancellation
96
"""
97
98
@property
99
def client(self):
100
"""The boto3 S3 client instance."""
101
102
@property
103
def config(self) -> TransferConfig:
104
"""The transfer configuration."""
105
106
# Class constants for allowed operation arguments
107
ALLOWED_DOWNLOAD_ARGS: List[str]
108
ALLOWED_UPLOAD_ARGS: List[str]
109
ALLOWED_COPY_ARGS: List[str]
110
ALLOWED_DELETE_ARGS: List[str]
111
```
112
113
### TransferCoordinatorController
114
115
Controls and manages all transfer coordinators for a transfer manager, providing centralized coordination and lifecycle management.
116
117
```python { .api }
118
class TransferCoordinatorController:
119
"""
120
Controls all transfer coordinators for a manager.
121
"""
122
def add_transfer_coordinator(self, transfer_coordinator):
123
"""
124
Add a transfer coordinator to track.
125
126
Args:
127
transfer_coordinator: TransferCoordinator to manage
128
"""
129
130
def remove_transfer_coordinator(self, transfer_coordinator):
131
"""
132
Remove a transfer coordinator from tracking.
133
134
Args:
135
transfer_coordinator: TransferCoordinator to remove
136
"""
137
138
def cancel(self, msg='', exc_type=None):
139
"""
140
Cancel all tracked transfers.
141
142
Args:
143
msg (str): Cancellation message
144
exc_type: Exception type for cancellation
145
"""
146
147
def wait(self):
148
"""Wait for all transfers to complete."""
149
150
@property
151
def tracked_transfer_coordinators(self) -> Set:
152
"""Set of currently tracked transfer coordinators."""
153
```
154
155
## Usage Examples
156
157
### Basic Upload and Download
158
159
```python
160
import boto3
161
from s3transfer.manager import TransferManager
162
163
# Create transfer manager
164
client = boto3.client('s3', region_name='us-west-2')
165
transfer_manager = TransferManager(client)
166
167
try:
168
# Upload from file
169
with open('/tmp/data.csv', 'rb') as f:
170
future = transfer_manager.upload(f, 'my-bucket', 'data.csv')
171
future.result() # Wait for completion
172
173
# Download to file
174
with open('/tmp/downloaded.csv', 'wb') as f:
175
future = transfer_manager.download('my-bucket', 'data.csv', f)
176
future.result() # Wait for completion
177
178
finally:
179
# Always shutdown when done
180
transfer_manager.shutdown()
181
```
182
183
### Upload with Progress Tracking
184
185
```python
186
from s3transfer.manager import TransferManager
187
from s3transfer.subscribers import BaseSubscriber
188
189
class ProgressSubscriber(BaseSubscriber):
190
def __init__(self, filename):
191
self._filename = filename
192
self._size = None
193
self._seen_so_far = 0
194
195
def on_progress(self, bytes_transferred, **kwargs):
196
self._seen_so_far += bytes_transferred
197
if self._size:
198
percentage = (self._seen_so_far / self._size) * 100
199
print(f"\r{self._filename}: {percentage:.2f}% complete", end='')
200
201
def on_done(self, **kwargs):
202
print(f"\n{self._filename}: Transfer complete!")
203
204
# Upload with progress tracking
205
client = boto3.client('s3')
206
transfer_manager = TransferManager(client)
207
208
try:
209
progress_subscriber = ProgressSubscriber('/tmp/large_file.dat')
210
211
with open('/tmp/large_file.dat', 'rb') as f:
212
future = transfer_manager.upload(
213
f, 'my-bucket', 'large_file.dat',
214
subscribers=[progress_subscriber]
215
)
216
# Set file size for accurate progress
217
future.meta.provide_transfer_size(os.path.getsize('/tmp/large_file.dat'))
218
future.result()
219
220
finally:
221
transfer_manager.shutdown()
222
```
223
224
### Concurrent Operations
225
226
```python
227
from s3transfer.manager import TransferManager
228
import concurrent.futures
229
230
client = boto3.client('s3')
231
transfer_manager = TransferManager(client)
232
233
try:
234
# Start multiple uploads concurrently
235
upload_futures = []
236
files_to_upload = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
237
238
for filename in files_to_upload:
239
with open(filename, 'rb') as f:
240
future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
241
upload_futures.append(future)
242
243
# Wait for all uploads to complete
244
for future in upload_futures:
245
try:
246
future.result()
247
print(f"Upload completed for {future.meta.call_args.key}")
248
except Exception as e:
249
print(f"Upload failed: {e}")
250
251
finally:
252
transfer_manager.shutdown()
253
```
254
255
### Copy Operations
256
257
```python
258
from s3transfer.manager import TransferManager
259
260
client = boto3.client('s3')
261
transfer_manager = TransferManager(client)
262
263
try:
264
# Copy within same account
265
copy_source = {'Bucket': 'source-bucket', 'Key': 'source-file.txt'}
266
future = transfer_manager.copy(
267
copy_source, 'destination-bucket', 'destination-file.txt'
268
)
269
future.result()
270
271
# Copy with version
272
copy_source = {
273
'Bucket': 'source-bucket',
274
'Key': 'source-file.txt',
275
'VersionId': 'version-id-here'
276
}
277
future = transfer_manager.copy(
278
copy_source, 'destination-bucket', 'versioned-copy.txt'
279
)
280
future.result()
281
282
finally:
283
transfer_manager.shutdown()
284
```
285
286
### Advanced Configuration
287
288
```python
289
from s3transfer.manager import TransferManager, TransferConfig
290
291
# Create advanced configuration
292
config = TransferConfig(
293
multipart_threshold=64 * 1024 * 1024, # 64MB threshold
294
multipart_chunksize=64 * 1024 * 1024, # 64MB chunks
295
max_request_concurrency=20, # 20 concurrent requests
296
max_submission_concurrency=10, # 10 concurrent submissions
297
max_bandwidth=100 * 1024 * 1024, # 100MB/s bandwidth limit
298
num_download_attempts=10, # 10 retry attempts
299
max_in_memory_upload_chunks=5, # 5 chunks in memory
300
max_in_memory_download_chunks=5 # 5 chunks in memory
301
)
302
303
client = boto3.client('s3')
304
transfer_manager = TransferManager(client, config)
305
306
try:
307
# Large file operations will benefit from this configuration
308
with open('/tmp/very_large_file.dat', 'rb') as f:
309
future = transfer_manager.upload(f, 'my-bucket', 'very_large_file.dat')
310
311
# Monitor progress
312
while not future.done():
313
time.sleep(1)
314
if hasattr(future.meta, 'size') and future.meta.size:
315
print(f"Transfer size: {future.meta.size} bytes")
316
317
future.result()
318
319
finally:
320
transfer_manager.shutdown()
321
```
322
323
### Error Handling
324
325
```python
326
from s3transfer.manager import TransferManager
327
from s3transfer.exceptions import S3UploadFailedError, TransferNotDoneError
328
329
client = boto3.client('s3')
330
transfer_manager = TransferManager(client)
331
332
try:
333
with open('/tmp/test_file.txt', 'rb') as f:
334
future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
335
336
try:
337
# Wait for completion with timeout
338
result = future.result()
339
print("Upload successful!")
340
341
except S3UploadFailedError as e:
342
print(f"Upload failed: {e}")
343
344
except TransferNotDoneError as e:
345
print(f"Transfer not complete: {e}")
346
347
except Exception as e:
348
print(f"Unexpected error: {e}")
349
350
finally:
351
# Shutdown with cancellation if needed
352
transfer_manager.shutdown(cancel=True, cancel_msg="Application shutting down")
353
```
354
355
### Delete Operations
356
357
```python
358
from s3transfer.manager import TransferManager
359
360
client = boto3.client('s3')
361
transfer_manager = TransferManager(client)
362
363
try:
364
# Simple delete
365
future = transfer_manager.delete('my-bucket', 'file-to-delete.txt')
366
future.result()
367
368
# Delete with version
369
future = transfer_manager.delete(
370
'my-bucket', 'versioned-file.txt',
371
extra_args={'VersionId': 'version-id-here'}
372
)
373
future.result()
374
375
# Batch delete
376
files_to_delete = ['file1.txt', 'file2.txt', 'file3.txt']
377
delete_futures = []
378
379
for filename in files_to_delete:
380
future = transfer_manager.delete('my-bucket', filename)
381
delete_futures.append(future)
382
383
# Wait for all deletions
384
for future in delete_futures:
385
try:
386
future.result()
387
print(f"Deleted: {future.meta.call_args.key}")
388
except Exception as e:
389
print(f"Delete failed: {e}")
390
391
finally:
392
transfer_manager.shutdown()
393
```
394
395
## Allowed Extra Arguments
396
397
### Upload Arguments
398
399
- `ACL`: Access control list permissions
400
- `CacheControl`: Cache control directives
401
- `ContentDisposition`: Content disposition header
402
- `ContentEncoding`: Content encoding (e.g., 'gzip')
403
- `ContentLanguage`: Content language
404
- `ContentType`: MIME type of the content
405
- `Expires`: Expiration date
406
- `GrantFullControl`: Full control permissions
407
- `GrantRead`: Read permissions
408
- `GrantReadACP`: Read ACP permissions
409
- `GrantWriteACL`: Write ACL permissions
410
- `Metadata`: User-defined metadata dictionary
411
- `RequestPayer`: Request payer setting
412
- `ServerSideEncryption`: Server-side encryption method
413
- `StorageClass`: Storage class (STANDARD, REDUCED_REDUNDANCY, etc.)
414
- `SSECustomerAlgorithm`: Customer-provided encryption algorithm
415
- `SSECustomerKey`: Customer-provided encryption key
416
- `SSECustomerKeyMD5`: MD5 hash of customer encryption key
417
- `SSEKMSKeyId`: KMS key ID for encryption
418
- `SSEKMSEncryptionContext`: KMS encryption context
419
- `Tagging`: Object tags
420
421
### Download Arguments
422
423
- `VersionId`: Specific version of the object to download
424
- `SSECustomerAlgorithm`: Customer-provided encryption algorithm
425
- `SSECustomerKey`: Customer-provided encryption key
426
- `SSECustomerKeyMD5`: MD5 hash of customer encryption key
427
- `RequestPayer`: Request payer setting
428
429
### Copy Arguments
430
431
- All upload arguments plus:
432
- `MetadataDirective`: Whether to copy or replace metadata
433
- `TaggingDirective`: Whether to copy or replace tags
434
- `CopySourceIfMatch`: Copy only if ETag matches
435
- `CopySourceIfModifiedSince`: Copy only if modified since date
436
- `CopySourceIfNoneMatch`: Copy only if ETag doesn't match
437
- `CopySourceIfUnmodifiedSince`: Copy only if unmodified since date
438
- `CopySourceSSECustomerAlgorithm`: Source encryption algorithm
439
- `CopySourceSSECustomerKey`: Source encryption key
440
- `CopySourceSSECustomerKeyMD5`: Source encryption key MD5
441
442
### Delete Arguments
443
444
- `VersionId`: Specific version to delete
445
- `MFA`: MFA authentication for delete operations
446
- `RequestPayer`: Request payer setting
447
448
## Best Practices
449
450
1. **Always call shutdown()**: Use try/finally or context managers to ensure cleanup
451
2. **Handle futures properly**: Check future.done() and handle exceptions from future.result()
452
3. **Use appropriate configuration**: Tune settings based on file sizes and network conditions
453
4. **Monitor progress**: Use subscribers for long-running transfers
454
5. **Handle errors gracefully**: Catch specific exceptions and implement retry logic where appropriate
455
6. **Resource management**: Avoid creating multiple TransferManager instances unnecessarily