or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bandwidth-management.mdconfiguration.mdcrt-support.mdexception-handling.mdfile-utilities.mdfutures-coordination.mdindex.mdlegacy-transfer.mdprocess-pool-downloads.mdsubscribers-callbacks.mdtransfer-manager.md

subscribers-callbacks.mddocs/

0

# Event Subscribers and Callbacks

1

2

Extensible subscriber system for handling transfer events including progress updates, completion notifications, error handling, and custom event processing throughout the transfer lifecycle.

3

4

## Capabilities

5

6

### BaseSubscriber

7

8

Base class for implementing transfer event subscribers with standardized callback methods for different transfer lifecycle events.

9

10

```python { .api }

11

class BaseSubscriber:

12

"""

13

Base class for transfer event subscribers.

14

15

Provides callback methods that are called during different phases of transfer operations.

16

Subclass this to implement custom event handling.

17

"""

18

def on_queued(self, **kwargs):

19

"""

20

Called when transfer is queued for execution.

21

22

Args:

23

**kwargs: Additional event information including:

24

- transfer_id: Unique transfer identifier

25

- call_args: Original call arguments

26

- user_context: User-defined context

27

"""

28

29

def on_progress(self, bytes_transferred: int, **kwargs):

30

"""

31

Called when transfer progress is made.

32

33

Args:

34

bytes_transferred (int): Number of bytes transferred in this progress event

35

**kwargs: Additional event information including:

36

- total_bytes_transferred: Total bytes transferred so far

37

- transfer_size: Total transfer size (if known)

38

- transfer_id: Unique transfer identifier

39

"""

40

41

def on_done(self, **kwargs):

42

"""

43

Called when transfer completes (successfully or with error).

44

45

Args:

46

**kwargs: Additional event information including:

47

- transfer_id: Unique transfer identifier

48

- exception: Exception if transfer failed (None if successful)

49

- result: Transfer result

50

"""

51

52

# Class constant defining valid subscriber callback types

53

VALID_SUBSCRIBER_TYPES: List[str]

54

```

55

56

## Usage Examples

57

58

### Basic Progress Subscriber

59

60

```python

61

from s3transfer.subscribers import BaseSubscriber

62

from s3transfer.manager import TransferManager

63

import time

64

65

class ProgressSubscriber(BaseSubscriber):

66

"""Simple progress tracking subscriber."""

67

68

def __init__(self, description="Transfer"):

69

self.description = description

70

self.start_time = None

71

self.total_transferred = 0

72

self.last_update = None

73

74

def on_queued(self, **kwargs):

75

self.start_time = time.time()

76

print(f"{self.description}: Queued (ID: {kwargs.get('transfer_id', 'unknown')})")

77

78

def on_progress(self, bytes_transferred, **kwargs):

79

self.total_transferred += bytes_transferred

80

current_time = time.time()

81

82

# Update progress every second to avoid spam

83

if self.last_update is None or current_time - self.last_update >= 1.0:

84

if self.start_time:

85

elapsed = current_time - self.start_time

86

rate = self.total_transferred / elapsed if elapsed > 0 else 0

87

88

transfer_size = kwargs.get('transfer_size')

89

if transfer_size:

90

percentage = (self.total_transferred / transfer_size) * 100

91

print(f"{self.description}: {percentage:.1f}% ({self.total_transferred}/{transfer_size} bytes) at {rate/1024:.1f} KB/s")

92

else:

93

print(f"{self.description}: {self.total_transferred} bytes at {rate/1024:.1f} KB/s")

94

95

self.last_update = current_time

96

97

def on_done(self, **kwargs):

98

if self.start_time:

99

elapsed = time.time() - self.start_time

100

avg_rate = self.total_transferred / elapsed if elapsed > 0 else 0

101

102

exception = kwargs.get('exception')

103

if exception:

104

print(f"{self.description}: Failed after {elapsed:.2f}s - {exception}")

105

else:

106

print(f"{self.description}: Completed in {elapsed:.2f}s (avg: {avg_rate/1024:.1f} KB/s)")

107

108

# Use the progress subscriber

109

import boto3

110

111

client = boto3.client('s3')

112

transfer_manager = TransferManager(client)

113

114

try:

115

progress_sub = ProgressSubscriber("Upload large file")

116

117

with open('/tmp/large_file.dat', 'rb') as f:

118

future = transfer_manager.upload(

119

f, 'my-bucket', 'large_file.dat',

120

subscribers=[progress_sub]

121

)

122

123

# Provide size for accurate progress percentage

124

file_size = os.path.getsize('/tmp/large_file.dat')

125

future.meta.provide_transfer_size(file_size)

126

127

result = future.result()

128

129

finally:

130

transfer_manager.shutdown()

131

```

132

133

### Comprehensive Event Logger

134

135

```python

136

import logging

137

import json

138

from datetime import datetime

139

140

class TransferEventLogger(BaseSubscriber):

141

"""Comprehensive event logger for transfer operations."""

142

143

def __init__(self, logger_name="s3transfer"):

144

self.logger = logging.getLogger(logger_name)

145

self.transfer_stats = {}

146

147

def on_queued(self, **kwargs):

148

transfer_id = kwargs.get('transfer_id', 'unknown')

149

call_args = kwargs.get('call_args', {})

150

151

self.transfer_stats[transfer_id] = {

152

'queued_time': datetime.now().isoformat(),

153

'total_bytes': 0,

154

'progress_events': 0,

155

'bucket': getattr(call_args, 'bucket', 'unknown'),

156

'key': getattr(call_args, 'key', 'unknown')

157

}

158

159

self.logger.info(f"Transfer queued: {transfer_id}", extra={

160

'transfer_id': transfer_id,

161

'bucket': self.transfer_stats[transfer_id]['bucket'],

162

'key': self.transfer_stats[transfer_id]['key'],

163

'event': 'queued'

164

})

165

166

def on_progress(self, bytes_transferred, **kwargs):

167

transfer_id = kwargs.get('transfer_id', 'unknown')

168

169

if transfer_id in self.transfer_stats:

170

stats = self.transfer_stats[transfer_id]

171

stats['total_bytes'] += bytes_transferred

172

stats['progress_events'] += 1

173

stats['last_progress'] = datetime.now().isoformat()

174

175

# Log significant progress milestones

176

if stats['progress_events'] % 100 == 0: # Every 100th progress event

177

self.logger.debug(f"Progress milestone: {transfer_id}", extra={

178

'transfer_id': transfer_id,

179

'bytes_transferred': bytes_transferred,

180

'total_bytes': stats['total_bytes'],

181

'progress_events': stats['progress_events'],

182

'event': 'progress_milestone'

183

})

184

185

def on_done(self, **kwargs):

186

transfer_id = kwargs.get('transfer_id', 'unknown')

187

exception = kwargs.get('exception')

188

189

if transfer_id in self.transfer_stats:

190

stats = self.transfer_stats[transfer_id]

191

stats['completed_time'] = datetime.now().isoformat()

192

stats['success'] = exception is None

193

194

if exception:

195

stats['error'] = str(exception)

196

self.logger.error(f"Transfer failed: {transfer_id}", extra={

197

'transfer_id': transfer_id,

198

'error': str(exception),

199

'total_bytes': stats['total_bytes'],

200

'event': 'failed'

201

})

202

else:

203

self.logger.info(f"Transfer completed: {transfer_id}", extra={

204

'transfer_id': transfer_id,

205

'total_bytes': stats['total_bytes'],

206

'progress_events': stats['progress_events'],

207

'event': 'completed'

208

})

209

210

# Clean up stats to prevent memory leaks

211

del self.transfer_stats[transfer_id]

212

213

# Configure logging

214

logging.basicConfig(

215

level=logging.INFO,

216

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'

217

)

218

219

# Use the logger

220

client = boto3.client('s3')

221

transfer_manager = TransferManager(client)

222

223

try:

224

event_logger = TransferEventLogger()

225

226

# Multiple transfers with comprehensive logging

227

files_to_upload = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']

228

futures = []

229

230

for filename in files_to_upload:

231

with open(filename, 'rb') as f:

232

future = transfer_manager.upload(

233

f, 'my-bucket', os.path.basename(filename),

234

subscribers=[event_logger]

235

)

236

futures.append(future)

237

238

# Wait for all transfers

239

for future in futures:

240

try:

241

future.result()

242

except Exception as e:

243

print(f"Transfer failed: {e}")

244

245

finally:

246

transfer_manager.shutdown()

247

```

248

249

### Custom Metrics Collector

250

251

```python

252

import threading

253

from collections import defaultdict, deque

254

from datetime import datetime, timedelta

255

256

class MetricsCollector(BaseSubscriber):

257

"""Collects detailed metrics about transfer performance."""

258

259

def __init__(self, window_seconds=300): # 5-minute window

260

self.window_seconds = window_seconds

261

self.lock = threading.Lock()

262

263

# Metrics storage

264

self.transfer_metrics = defaultdict(dict)

265

self.throughput_samples = deque()

266

self.error_counts = defaultdict(int)

267

self.completion_times = deque()

268

269

# Running statistics

270

self.total_transfers = 0

271

self.successful_transfers = 0

272

self.total_bytes_transferred = 0

273

274

def on_queued(self, **kwargs):

275

transfer_id = kwargs.get('transfer_id', 'unknown')

276

277

with self.lock:

278

self.transfer_metrics[transfer_id] = {

279

'queued_time': datetime.now(),

280

'bytes_transferred': 0,

281

'progress_count': 0

282

}

283

self.total_transfers += 1

284

285

def on_progress(self, bytes_transferred, **kwargs):

286

transfer_id = kwargs.get('transfer_id', 'unknown')

287

current_time = datetime.now()

288

289

with self.lock:

290

if transfer_id in self.transfer_metrics:

291

metrics = self.transfer_metrics[transfer_id]

292

metrics['bytes_transferred'] += bytes_transferred

293

metrics['progress_count'] += 1

294

metrics['last_progress'] = current_time

295

296

# Add throughput sample

297

self.throughput_samples.append((current_time, bytes_transferred))

298

self._cleanup_old_samples(current_time)

299

300

def on_done(self, **kwargs):

301

transfer_id = kwargs.get('transfer_id', 'unknown')

302

exception = kwargs.get('exception')

303

current_time = datetime.now()

304

305

with self.lock:

306

if transfer_id in self.transfer_metrics:

307

metrics = self.transfer_metrics[transfer_id]

308

metrics['completed_time'] = current_time

309

310

if exception:

311

error_type = type(exception).__name__

312

self.error_counts[error_type] += 1

313

metrics['error'] = str(exception)

314

else:

315

self.successful_transfers += 1

316

self.total_bytes_transferred += metrics['bytes_transferred']

317

318

# Calculate transfer duration

319

duration = current_time - metrics['queued_time']

320

self.completion_times.append((current_time, duration.total_seconds()))

321

322

# Clean up old completion times

323

self._cleanup_old_completions(current_time)

324

325

# Clean up transfer metrics

326

del self.transfer_metrics[transfer_id]

327

328

def _cleanup_old_samples(self, current_time):

329

"""Remove throughput samples older than window."""

330

cutoff = current_time - timedelta(seconds=self.window_seconds)

331

while self.throughput_samples and self.throughput_samples[0][0] < cutoff:

332

self.throughput_samples.popleft()

333

334

def _cleanup_old_completions(self, current_time):

335

"""Remove completion times older than window."""

336

cutoff = current_time - timedelta(seconds=self.window_seconds)

337

while self.completion_times and self.completion_times[0][0] < cutoff:

338

self.completion_times.popleft()

339

340

def get_current_throughput(self):

341

"""Get current throughput in bytes per second."""

342

with self.lock:

343

if not self.throughput_samples:

344

return 0.0

345

346

total_bytes = sum(sample[1] for sample in self.throughput_samples)

347

time_span = (self.throughput_samples[-1][0] - self.throughput_samples[0][0]).total_seconds()

348

349

return total_bytes / time_span if time_span > 0 else 0.0

350

351

def get_average_completion_time(self):

352

"""Get average completion time in seconds."""

353

with self.lock:

354

if not self.completion_times:

355

return 0.0

356

357

return sum(ct[1] for ct in self.completion_times) / len(self.completion_times)

358

359

def get_success_rate(self):

360

"""Get success rate as percentage."""

361

with self.lock:

362

if self.total_transfers == 0:

363

return 0.0

364

return (self.successful_transfers / self.total_transfers) * 100

365

366

def get_error_summary(self):

367

"""Get error counts by type."""

368

with self.lock:

369

return dict(self.error_counts)

370

371

def print_metrics(self):

372

"""Print current metrics summary."""

373

throughput = self.get_current_throughput()

374

avg_completion = self.get_average_completion_time()

375

success_rate = self.get_success_rate()

376

errors = self.get_error_summary()

377

378

print("\n=== Transfer Metrics ===")

379

print(f"Total transfers: {self.total_transfers}")

380

print(f"Successful transfers: {self.successful_transfers}")

381

print(f"Success rate: {success_rate:.1f}%")

382

print(f"Current throughput: {throughput / 1024:.1f} KB/s")

383

print(f"Average completion time: {avg_completion:.2f} seconds")

384

print(f"Total bytes transferred: {self.total_bytes_transferred / (1024*1024):.1f} MB")

385

386

if errors:

387

print("Error summary:")

388

for error_type, count in errors.items():

389

print(f" {error_type}: {count}")

390

print("========================\n")

391

392

# Use the metrics collector

393

import time

394

395

client = boto3.client('s3')

396

transfer_manager = TransferManager(client)

397

398

try:

399

metrics = MetricsCollector()

400

401

# Start multiple transfers with metrics collection

402

futures = []

403

for i in range(10):

404

filename = f'/tmp/test_file_{i}.txt'

405

# Create test file

406

with open(filename, 'w') as f:

407

f.write('x' * (1024 * (i + 1))) # Files of increasing size

408

409

with open(filename, 'rb') as f:

410

future = transfer_manager.upload(

411

f, 'my-bucket', f'test_file_{i}.txt',

412

subscribers=[metrics]

413

)

414

futures.append(future)

415

416

# Monitor progress and print metrics periodically

417

completed = 0

418

while completed < len(futures):

419

time.sleep(2)

420

421

# Check for completed transfers

422

for future in futures:

423

if future.done() and not hasattr(future, '_processed'):

424

completed += 1

425

future._processed = True

426

427

# Print current metrics

428

metrics.print_metrics()

429

430

# Final metrics

431

print("Final metrics:")

432

metrics.print_metrics()

433

434

finally:

435

transfer_manager.shutdown()

436

```

437

438

### Conditional Event Handling

439

440

```python

441

class ConditionalSubscriber(BaseSubscriber):

442

"""Subscriber that handles events based on specific conditions."""

443

444

def __init__(self, conditions=None):

445

self.conditions = conditions or {}

446

self.matched_transfers = set()

447

448

def _matches_conditions(self, **kwargs):

449

"""Check if event matches specified conditions."""

450

if not self.conditions:

451

return True

452

453

call_args = kwargs.get('call_args')

454

if not call_args:

455

return False

456

457

# Check bucket condition

458

if 'bucket' in self.conditions:

459

if getattr(call_args, 'bucket', None) != self.conditions['bucket']:

460

return False

461

462

# Check key pattern condition

463

if 'key_pattern' in self.conditions:

464

key = getattr(call_args, 'key', '')

465

if self.conditions['key_pattern'] not in key:

466

return False

467

468

# Check minimum size condition

469

if 'min_size' in self.conditions:

470

transfer_size = kwargs.get('transfer_size', 0)

471

if transfer_size < self.conditions['min_size']:

472

return False

473

474

return True

475

476

def on_queued(self, **kwargs):

477

if self._matches_conditions(**kwargs):

478

transfer_id = kwargs.get('transfer_id')

479

self.matched_transfers.add(transfer_id)

480

print(f"Monitoring transfer: {transfer_id}")

481

482

def on_progress(self, bytes_transferred, **kwargs):

483

transfer_id = kwargs.get('transfer_id')

484

if transfer_id in self.matched_transfers:

485

total_transferred = kwargs.get('total_bytes_transferred', bytes_transferred)

486

print(f"Progress for {transfer_id}: {total_transferred} bytes")

487

488

def on_done(self, **kwargs):

489

transfer_id = kwargs.get('transfer_id')

490

if transfer_id in self.matched_transfers:

491

exception = kwargs.get('exception')

492

if exception:

493

print(f"Monitored transfer failed: {transfer_id} - {exception}")

494

else:

495

print(f"Monitored transfer completed: {transfer_id}")

496

497

self.matched_transfers.discard(transfer_id)

498

499

# Use conditional subscriber

500

conditions = {

501

'bucket': 'important-bucket',

502

'key_pattern': 'critical/',

503

'min_size': 10 * 1024 * 1024 # Only files > 10MB

504

}

505

506

conditional_sub = ConditionalSubscriber(conditions)

507

508

# This transfer will be monitored (if it meets conditions)

509

with open('/tmp/critical_large_file.dat', 'rb') as f:

510

future = transfer_manager.upload(

511

f, 'important-bucket', 'critical/large_file.dat',

512

subscribers=[conditional_sub]

513

)

514

future.result()

515

```

516

517

### Multi-Subscriber Coordination

518

519

```python

520

class SubscriberCoordinator:

521

"""Coordinates multiple subscribers for complex event handling."""

522

523

def __init__(self, subscribers=None):

524

self.subscribers = subscribers or []

525

self.global_stats = {

526

'total_queued': 0,

527

'total_completed': 0,

528

'total_failed': 0,

529

'total_bytes': 0

530

}

531

532

def add_subscriber(self, subscriber):

533

"""Add a subscriber to the coordination."""

534

self.subscribers.append(subscriber)

535

536

def create_coordinated_subscriber(self):

537

"""Create a subscriber that coordinates with all registered subscribers."""

538

539

class CoordinatedSubscriber(BaseSubscriber):

540

def __init__(self, coordinator):

541

self.coordinator = coordinator

542

543

def on_queued(self, **kwargs):

544

self.coordinator.global_stats['total_queued'] += 1

545

for sub in self.coordinator.subscribers:

546

try:

547

sub.on_queued(**kwargs)

548

except Exception as e:

549

print(f"Subscriber error in on_queued: {e}")

550

551

def on_progress(self, bytes_transferred, **kwargs):

552

self.coordinator.global_stats['total_bytes'] += bytes_transferred

553

for sub in self.coordinator.subscribers:

554

try:

555

sub.on_progress(bytes_transferred, **kwargs)

556

except Exception as e:

557

print(f"Subscriber error in on_progress: {e}")

558

559

def on_done(self, **kwargs):

560

exception = kwargs.get('exception')

561

if exception:

562

self.coordinator.global_stats['total_failed'] += 1

563

else:

564

self.coordinator.global_stats['total_completed'] += 1

565

566

for sub in self.coordinator.subscribers:

567

try:

568

sub.on_done(**kwargs)

569

except Exception as e:

570

print(f"Subscriber error in on_done: {e}")

571

572

return CoordinatedSubscriber(self)

573

574

def print_global_stats(self):

575

"""Print global statistics across all subscribers."""

576

stats = self.global_stats

577

print(f"\nGlobal Stats:")

578

print(f" Queued: {stats['total_queued']}")

579

print(f" Completed: {stats['total_completed']}")

580

print(f" Failed: {stats['total_failed']}")

581

print(f" Total bytes: {stats['total_bytes'] / (1024*1024):.1f} MB")

582

583

# Use subscriber coordination

584

coordinator = SubscriberCoordinator()

585

586

# Add multiple subscribers

587

coordinator.add_subscriber(ProgressSubscriber("Global Progress"))

588

coordinator.add_subscriber(TransferEventLogger())

589

coordinator.add_subscriber(MetricsCollector())

590

591

# Create coordinated subscriber

592

coordinated_sub = coordinator.create_coordinated_subscriber()

593

594

# Use with transfers

595

with open('/tmp/test_file.dat', 'rb') as f:

596

future = transfer_manager.upload(

597

f, 'my-bucket', 'test_file.dat',

598

subscribers=[coordinated_sub]

599

)

600

future.result()

601

602

coordinator.print_global_stats()

603

```

604

605

## Event Information Reference

606

607

### on_queued Event Parameters

608

609

- `transfer_id`: Unique identifier for the transfer

610

- `call_args`: Original method call arguments (bucket, key, etc.)

611

- `user_context`: User-defined context dictionary

612

613

### on_progress Event Parameters

614

615

- `bytes_transferred`: Bytes transferred in this progress event

616

- `total_bytes_transferred`: Total bytes transferred so far (optional)

617

- `transfer_size`: Total transfer size if known (optional)

618

- `transfer_id`: Unique identifier for the transfer

619

620

### on_done Event Parameters

621

622

- `transfer_id`: Unique identifier for the transfer

623

- `exception`: Exception object if transfer failed (None if successful)

624

- `result`: Transfer result object

625

- `call_args`: Original method call arguments

626

627

## Best Practices

628

629

### Subscriber Implementation

630

631

1. **Handle exceptions**: Wrap subscriber code in try/catch blocks

632

2. **Avoid blocking operations**: Keep callback methods fast and non-blocking

633

3. **Use threading safely**: Protect shared data with locks in multi-threaded environments

634

4. **Clean up resources**: Remove references to completed transfers to prevent memory leaks

635

636

### Performance Considerations

637

638

1. **Limit logging frequency**: Avoid logging every progress event for large transfers

639

2. **Use efficient data structures**: Choose appropriate containers for metrics storage

640

3. **Batch operations**: Group multiple events for processing efficiency

641

4. **Monitor memory usage**: Clean up old data periodically

642

643

### Error Handling

644

645

1. **Graceful degradation**: Subscriber failures shouldn't affect transfers

646

2. **Log subscriber errors**: Report subscriber exceptions for debugging

647

3. **Validate event data**: Check for required fields before processing

648

4. **Implement fallbacks**: Provide default behavior when subscribers fail