or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bandwidth-management.mdconfiguration.mdcrt-support.mdexception-handling.mdfile-utilities.mdfutures-coordination.mdindex.mdlegacy-transfer.mdprocess-pool-downloads.mdsubscribers-callbacks.mdtransfer-manager.md

transfer-manager.mddocs/

0

# Modern Transfer Manager

1

2

The TransferManager class is the recommended interface for S3 operations, providing enhanced capabilities including upload/download/copy/delete operations, better resource management, future-based asynchronous execution, and comprehensive configuration options.

3

4

## Capabilities

5

6

### TransferManager Class

7

8

The main transfer management class supporting all S3 operations with asynchronous execution and comprehensive resource management.

9

10

```python { .api }

11

class TransferManager:

12

"""

13

Modern S3 transfer manager with enhanced capabilities.

14

15

Args:

16

client: boto3 S3 client instance

17

config: TransferConfig for controlling transfer behavior

18

osutil: OSUtils instance for file operations

19

executor_cls: Executor class for managing threads/processes

20

"""

21

def __init__(self, client, config=None, osutil=None, executor_cls=None): ...

22

23

def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None) -> TransferFuture:

24

"""

25

Upload a file-like object to S3.

26

27

Args:

28

fileobj: File-like object to upload (must support read())

29

bucket (str): S3 bucket name

30

key (str): S3 object key/name

31

extra_args (dict, optional): Additional S3 operation arguments

32

subscribers (list, optional): List of subscriber objects for events

33

34

Returns:

35

TransferFuture: Future object for tracking transfer progress

36

37

Raises:

38

ValueError: If extra_args contains invalid keys

39

"""

40

41

def download(self, bucket, key, fileobj, extra_args=None, subscribers=None) -> TransferFuture:

42

"""

43

Download an S3 object to a file-like object.

44

45

Args:

46

bucket (str): S3 bucket name

47

key (str): S3 object key/name

48

fileobj: File-like object to write to (must support write())

49

extra_args (dict, optional): Additional S3 operation arguments

50

subscribers (list, optional): List of subscriber objects for events

51

52

Returns:

53

TransferFuture: Future object for tracking transfer progress

54

55

Raises:

56

ValueError: If extra_args contains invalid keys

57

"""

58

59

def copy(self, copy_source, bucket, key, extra_args=None, subscribers=None, source_client=None) -> TransferFuture:

60

"""

61

Copy an S3 object to another location.

62

63

Args:

64

copy_source (dict): Source object specification {'Bucket': str, 'Key': str, 'VersionId': str}

65

bucket (str): Destination bucket name

66

key (str): Destination object key/name

67

extra_args (dict, optional): Additional S3 operation arguments

68

subscribers (list, optional): List of subscriber objects for events

69

source_client: Separate S3 client for source operations

70

71

Returns:

72

TransferFuture: Future object for tracking transfer progress

73

"""

74

75

def delete(self, bucket, key, extra_args=None, subscribers=None) -> TransferFuture:

76

"""

77

Delete an S3 object.

78

79

Args:

80

bucket (str): S3 bucket name

81

key (str): S3 object key/name

82

extra_args (dict, optional): Additional S3 operation arguments

83

subscribers (list, optional): List of subscriber objects for events

84

85

Returns:

86

TransferFuture: Future object for tracking transfer progress

87

"""

88

89

def shutdown(self, cancel=False, cancel_msg=''):

90

"""

91

Shutdown the transfer manager.

92

93

Args:

94

cancel (bool): Whether to cancel ongoing transfers

95

cancel_msg (str): Message to include with cancellation

96

"""

97

98

@property

99

def client(self):

100

"""The boto3 S3 client instance."""

101

102

@property

103

def config(self) -> TransferConfig:

104

"""The transfer configuration."""

105

106

# Class constants for allowed operation arguments

107

ALLOWED_DOWNLOAD_ARGS: List[str]

108

ALLOWED_UPLOAD_ARGS: List[str]

109

ALLOWED_COPY_ARGS: List[str]

110

ALLOWED_DELETE_ARGS: List[str]

111

```

112

113

### TransferCoordinatorController

114

115

Controls and manages all transfer coordinators for a transfer manager, providing centralized coordination and lifecycle management.

116

117

```python { .api }

118

class TransferCoordinatorController:

119

"""

120

Controls all transfer coordinators for a manager.

121

"""

122

def add_transfer_coordinator(self, transfer_coordinator):

123

"""

124

Add a transfer coordinator to track.

125

126

Args:

127

transfer_coordinator: TransferCoordinator to manage

128

"""

129

130

def remove_transfer_coordinator(self, transfer_coordinator):

131

"""

132

Remove a transfer coordinator from tracking.

133

134

Args:

135

transfer_coordinator: TransferCoordinator to remove

136

"""

137

138

def cancel(self, msg='', exc_type=None):

139

"""

140

Cancel all tracked transfers.

141

142

Args:

143

msg (str): Cancellation message

144

exc_type: Exception type for cancellation

145

"""

146

147

def wait(self):

148

"""Wait for all transfers to complete."""

149

150

@property

151

def tracked_transfer_coordinators(self) -> Set:

152

"""Set of currently tracked transfer coordinators."""

153

```

154

155

## Usage Examples

156

157

### Basic Upload and Download

158

159

```python

160

import boto3

161

from s3transfer.manager import TransferManager

162

163

# Create transfer manager

164

client = boto3.client('s3', region_name='us-west-2')

165

transfer_manager = TransferManager(client)

166

167

try:

168

# Upload from file

169

with open('/tmp/data.csv', 'rb') as f:

170

future = transfer_manager.upload(f, 'my-bucket', 'data.csv')

171

future.result() # Wait for completion

172

173

# Download to file

174

with open('/tmp/downloaded.csv', 'wb') as f:

175

future = transfer_manager.download('my-bucket', 'data.csv', f)

176

future.result() # Wait for completion

177

178

finally:

179

# Always shutdown when done

180

transfer_manager.shutdown()

181

```

182

183

### Upload with Progress Tracking

184

185

```python

186

from s3transfer.manager import TransferManager

187

from s3transfer.subscribers import BaseSubscriber

188

189

class ProgressSubscriber(BaseSubscriber):

190

def __init__(self, filename):

191

self._filename = filename

192

self._size = None

193

self._seen_so_far = 0

194

195

def on_progress(self, bytes_transferred, **kwargs):

196

self._seen_so_far += bytes_transferred

197

if self._size:

198

percentage = (self._seen_so_far / self._size) * 100

199

print(f"\r{self._filename}: {percentage:.2f}% complete", end='')

200

201

def on_done(self, **kwargs):

202

print(f"\n{self._filename}: Transfer complete!")

203

204

# Upload with progress tracking

205

client = boto3.client('s3')

206

transfer_manager = TransferManager(client)

207

208

try:

209

progress_subscriber = ProgressSubscriber('/tmp/large_file.dat')

210

211

with open('/tmp/large_file.dat', 'rb') as f:

212

future = transfer_manager.upload(

213

f, 'my-bucket', 'large_file.dat',

214

subscribers=[progress_subscriber]

215

)

216

# Set file size for accurate progress

217

future.meta.provide_transfer_size(os.path.getsize('/tmp/large_file.dat'))

218

future.result()

219

220

finally:

221

transfer_manager.shutdown()

222

```

223

224

### Concurrent Operations

225

226

```python

227

from s3transfer.manager import TransferManager

228

import concurrent.futures

229

230

client = boto3.client('s3')

231

transfer_manager = TransferManager(client)

232

233

try:

234

# Start multiple uploads concurrently

235

upload_futures = []

236

files_to_upload = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']

237

238

for filename in files_to_upload:

239

with open(filename, 'rb') as f:

240

future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))

241

upload_futures.append(future)

242

243

# Wait for all uploads to complete

244

for future in upload_futures:

245

try:

246

future.result()

247

print(f"Upload completed for {future.meta.call_args.key}")

248

except Exception as e:

249

print(f"Upload failed: {e}")

250

251

finally:

252

transfer_manager.shutdown()

253

```

254

255

### Copy Operations

256

257

```python

258

from s3transfer.manager import TransferManager

259

260

client = boto3.client('s3')

261

transfer_manager = TransferManager(client)

262

263

try:

264

# Copy within same account

265

copy_source = {'Bucket': 'source-bucket', 'Key': 'source-file.txt'}

266

future = transfer_manager.copy(

267

copy_source, 'destination-bucket', 'destination-file.txt'

268

)

269

future.result()

270

271

# Copy with version

272

copy_source = {

273

'Bucket': 'source-bucket',

274

'Key': 'source-file.txt',

275

'VersionId': 'version-id-here'

276

}

277

future = transfer_manager.copy(

278

copy_source, 'destination-bucket', 'versioned-copy.txt'

279

)

280

future.result()

281

282

finally:

283

transfer_manager.shutdown()

284

```

285

286

### Advanced Configuration

287

288

```python

289

from s3transfer.manager import TransferManager, TransferConfig

290

291

# Create advanced configuration

292

config = TransferConfig(

293

multipart_threshold=64 * 1024 * 1024, # 64MB threshold

294

multipart_chunksize=64 * 1024 * 1024, # 64MB chunks

295

max_request_concurrency=20, # 20 concurrent requests

296

max_submission_concurrency=10, # 10 concurrent submissions

297

max_bandwidth=100 * 1024 * 1024, # 100MB/s bandwidth limit

298

num_download_attempts=10, # 10 retry attempts

299

max_in_memory_upload_chunks=5, # 5 chunks in memory

300

max_in_memory_download_chunks=5 # 5 chunks in memory

301

)

302

303

client = boto3.client('s3')

304

transfer_manager = TransferManager(client, config)

305

306

try:

307

# Large file operations will benefit from this configuration

308

with open('/tmp/very_large_file.dat', 'rb') as f:

309

future = transfer_manager.upload(f, 'my-bucket', 'very_large_file.dat')

310

311

# Monitor progress

312

while not future.done():

313

time.sleep(1)

314

if hasattr(future.meta, 'size') and future.meta.size:

315

print(f"Transfer size: {future.meta.size} bytes")

316

317

future.result()

318

319

finally:

320

transfer_manager.shutdown()

321

```

322

323

### Error Handling

324

325

```python

326

from s3transfer.manager import TransferManager

327

from s3transfer.exceptions import S3UploadFailedError, TransferNotDoneError

328

329

client = boto3.client('s3')

330

transfer_manager = TransferManager(client)

331

332

try:

333

with open('/tmp/test_file.txt', 'rb') as f:

334

future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')

335

336

try:

337

# Wait for completion with timeout

338

result = future.result()

339

print("Upload successful!")

340

341

except S3UploadFailedError as e:

342

print(f"Upload failed: {e}")

343

344

except TransferNotDoneError as e:

345

print(f"Transfer not complete: {e}")

346

347

except Exception as e:

348

print(f"Unexpected error: {e}")

349

350

finally:

351

# Shutdown with cancellation if needed

352

transfer_manager.shutdown(cancel=True, cancel_msg="Application shutting down")

353

```

354

355

### Delete Operations

356

357

```python

358

from s3transfer.manager import TransferManager

359

360

client = boto3.client('s3')

361

transfer_manager = TransferManager(client)

362

363

try:

364

# Simple delete

365

future = transfer_manager.delete('my-bucket', 'file-to-delete.txt')

366

future.result()

367

368

# Delete with version

369

future = transfer_manager.delete(

370

'my-bucket', 'versioned-file.txt',

371

extra_args={'VersionId': 'version-id-here'}

372

)

373

future.result()

374

375

# Batch delete

376

files_to_delete = ['file1.txt', 'file2.txt', 'file3.txt']

377

delete_futures = []

378

379

for filename in files_to_delete:

380

future = transfer_manager.delete('my-bucket', filename)

381

delete_futures.append(future)

382

383

# Wait for all deletions

384

for future in delete_futures:

385

try:

386

future.result()

387

print(f"Deleted: {future.meta.call_args.key}")

388

except Exception as e:

389

print(f"Delete failed: {e}")

390

391

finally:

392

transfer_manager.shutdown()

393

```

394

395

## Allowed Extra Arguments

396

397

### Upload Arguments

398

399

- `ACL`: Access control list permissions

400

- `CacheControl`: Cache control directives

401

- `ContentDisposition`: Content disposition header

402

- `ContentEncoding`: Content encoding (e.g., 'gzip')

403

- `ContentLanguage`: Content language

404

- `ContentType`: MIME type of the content

405

- `Expires`: Expiration date

406

- `GrantFullControl`: Full control permissions

407

- `GrantRead`: Read permissions

408

- `GrantReadACP`: Read ACP permissions

409

- `GrantWriteACL`: Write ACL permissions

410

- `Metadata`: User-defined metadata dictionary

411

- `RequestPayer`: Request payer setting

412

- `ServerSideEncryption`: Server-side encryption method

413

- `StorageClass`: Storage class (STANDARD, REDUCED_REDUNDANCY, etc.)

414

- `SSECustomerAlgorithm`: Customer-provided encryption algorithm

415

- `SSECustomerKey`: Customer-provided encryption key

416

- `SSECustomerKeyMD5`: MD5 hash of customer encryption key

417

- `SSEKMSKeyId`: KMS key ID for encryption

418

- `SSEKMSEncryptionContext`: KMS encryption context

419

- `Tagging`: Object tags

420

421

### Download Arguments

422

423

- `VersionId`: Specific version of the object to download

424

- `SSECustomerAlgorithm`: Customer-provided encryption algorithm

425

- `SSECustomerKey`: Customer-provided encryption key

426

- `SSECustomerKeyMD5`: MD5 hash of customer encryption key

427

- `RequestPayer`: Request payer setting

428

429

### Copy Arguments

430

431

- All upload arguments plus:

432

- `MetadataDirective`: Whether to copy or replace metadata

433

- `TaggingDirective`: Whether to copy or replace tags

434

- `CopySourceIfMatch`: Copy only if ETag matches

435

- `CopySourceIfModifiedSince`: Copy only if modified since date

436

- `CopySourceIfNoneMatch`: Copy only if ETag doesn't match

437

- `CopySourceIfUnmodifiedSince`: Copy only if unmodified since date

438

- `CopySourceSSECustomerAlgorithm`: Source encryption algorithm

439

- `CopySourceSSECustomerKey`: Source encryption key

440

- `CopySourceSSECustomerKeyMD5`: Source encryption key MD5

441

442

### Delete Arguments

443

444

- `VersionId`: Specific version to delete

445

- `MFA`: MFA authentication for delete operations

446

- `RequestPayer`: Request payer setting

447

448

## Best Practices

449

450

1. **Always call shutdown()**: Use try/finally or context managers to ensure cleanup

451

2. **Handle futures properly**: Check future.done() and handle exceptions from future.result()

452

3. **Use appropriate configuration**: Tune settings based on file sizes and network conditions

453

4. **Monitor progress**: Use subscribers for long-running transfers

454

5. **Handle errors gracefully**: Catch specific exceptions and implement retry logic where appropriate

455

6. **Resource management**: Avoid creating multiple TransferManager instances unnecessarily