0
# Event Subscribers and Callbacks
1
2
Extensible subscriber system for handling transfer events including progress updates, completion notifications, error handling, and custom event processing throughout the transfer lifecycle.
3
4
## Capabilities
5
6
### BaseSubscriber
7
8
Base class for implementing transfer event subscribers with standardized callback methods for different transfer lifecycle events.
9
10
```python { .api }
11
class BaseSubscriber:
12
"""
13
Base class for transfer event subscribers.
14
15
Provides callback methods that are called during different phases of transfer operations.
16
Subclass this to implement custom event handling.
17
"""
18
def on_queued(self, **kwargs):
19
"""
20
Called when transfer is queued for execution.
21
22
Args:
23
**kwargs: Additional event information including:
24
- transfer_id: Unique transfer identifier
25
- call_args: Original call arguments
26
- user_context: User-defined context
27
"""
28
29
def on_progress(self, bytes_transferred: int, **kwargs):
30
"""
31
Called when transfer progress is made.
32
33
Args:
34
bytes_transferred (int): Number of bytes transferred in this progress event
35
**kwargs: Additional event information including:
36
- total_bytes_transferred: Total bytes transferred so far
37
- transfer_size: Total transfer size (if known)
38
- transfer_id: Unique transfer identifier
39
"""
40
41
def on_done(self, **kwargs):
42
"""
43
Called when transfer completes (successfully or with error).
44
45
Args:
46
**kwargs: Additional event information including:
47
- transfer_id: Unique transfer identifier
48
- exception: Exception if transfer failed (None if successful)
49
- result: Transfer result
50
"""
51
52
# Class constant defining valid subscriber callback types
53
VALID_SUBSCRIBER_TYPES: List[str]
54
```
55
56
## Usage Examples
57
58
### Basic Progress Subscriber
59
60
```python
61
from s3transfer.subscribers import BaseSubscriber
62
from s3transfer.manager import TransferManager
63
import time
64
65
class ProgressSubscriber(BaseSubscriber):
66
"""Simple progress tracking subscriber."""
67
68
def __init__(self, description="Transfer"):
69
self.description = description
70
self.start_time = None
71
self.total_transferred = 0
72
self.last_update = None
73
74
def on_queued(self, **kwargs):
75
self.start_time = time.time()
76
print(f"{self.description}: Queued (ID: {kwargs.get('transfer_id', 'unknown')})")
77
78
def on_progress(self, bytes_transferred, **kwargs):
79
self.total_transferred += bytes_transferred
80
current_time = time.time()
81
82
# Update progress every second to avoid spam
83
if self.last_update is None or current_time - self.last_update >= 1.0:
84
if self.start_time:
85
elapsed = current_time - self.start_time
86
rate = self.total_transferred / elapsed if elapsed > 0 else 0
87
88
transfer_size = kwargs.get('transfer_size')
89
if transfer_size:
90
percentage = (self.total_transferred / transfer_size) * 100
91
print(f"{self.description}: {percentage:.1f}% ({self.total_transferred}/{transfer_size} bytes) at {rate/1024:.1f} KB/s")
92
else:
93
print(f"{self.description}: {self.total_transferred} bytes at {rate/1024:.1f} KB/s")
94
95
self.last_update = current_time
96
97
def on_done(self, **kwargs):
98
if self.start_time:
99
elapsed = time.time() - self.start_time
100
avg_rate = self.total_transferred / elapsed if elapsed > 0 else 0
101
102
exception = kwargs.get('exception')
103
if exception:
104
print(f"{self.description}: Failed after {elapsed:.2f}s - {exception}")
105
else:
106
print(f"{self.description}: Completed in {elapsed:.2f}s (avg: {avg_rate/1024:.1f} KB/s)")
107
108
# Use the progress subscriber
109
import boto3
110
111
client = boto3.client('s3')
112
transfer_manager = TransferManager(client)
113
114
try:
115
progress_sub = ProgressSubscriber("Upload large file")
116
117
with open('/tmp/large_file.dat', 'rb') as f:
118
future = transfer_manager.upload(
119
f, 'my-bucket', 'large_file.dat',
120
subscribers=[progress_sub]
121
)
122
123
# Provide size for accurate progress percentage
124
file_size = os.path.getsize('/tmp/large_file.dat')
125
future.meta.provide_transfer_size(file_size)
126
127
result = future.result()
128
129
finally:
130
transfer_manager.shutdown()
131
```
132
133
### Comprehensive Event Logger
134
135
```python
136
import logging
137
import json
138
from datetime import datetime
139
140
class TransferEventLogger(BaseSubscriber):
141
"""Comprehensive event logger for transfer operations."""
142
143
def __init__(self, logger_name="s3transfer"):
144
self.logger = logging.getLogger(logger_name)
145
self.transfer_stats = {}
146
147
def on_queued(self, **kwargs):
148
transfer_id = kwargs.get('transfer_id', 'unknown')
149
call_args = kwargs.get('call_args', {})
150
151
self.transfer_stats[transfer_id] = {
152
'queued_time': datetime.now().isoformat(),
153
'total_bytes': 0,
154
'progress_events': 0,
155
'bucket': getattr(call_args, 'bucket', 'unknown'),
156
'key': getattr(call_args, 'key', 'unknown')
157
}
158
159
self.logger.info(f"Transfer queued: {transfer_id}", extra={
160
'transfer_id': transfer_id,
161
'bucket': self.transfer_stats[transfer_id]['bucket'],
162
'key': self.transfer_stats[transfer_id]['key'],
163
'event': 'queued'
164
})
165
166
def on_progress(self, bytes_transferred, **kwargs):
167
transfer_id = kwargs.get('transfer_id', 'unknown')
168
169
if transfer_id in self.transfer_stats:
170
stats = self.transfer_stats[transfer_id]
171
stats['total_bytes'] += bytes_transferred
172
stats['progress_events'] += 1
173
stats['last_progress'] = datetime.now().isoformat()
174
175
# Log significant progress milestones
176
if stats['progress_events'] % 100 == 0: # Every 100th progress event
177
self.logger.debug(f"Progress milestone: {transfer_id}", extra={
178
'transfer_id': transfer_id,
179
'bytes_transferred': bytes_transferred,
180
'total_bytes': stats['total_bytes'],
181
'progress_events': stats['progress_events'],
182
'event': 'progress_milestone'
183
})
184
185
def on_done(self, **kwargs):
186
transfer_id = kwargs.get('transfer_id', 'unknown')
187
exception = kwargs.get('exception')
188
189
if transfer_id in self.transfer_stats:
190
stats = self.transfer_stats[transfer_id]
191
stats['completed_time'] = datetime.now().isoformat()
192
stats['success'] = exception is None
193
194
if exception:
195
stats['error'] = str(exception)
196
self.logger.error(f"Transfer failed: {transfer_id}", extra={
197
'transfer_id': transfer_id,
198
'error': str(exception),
199
'total_bytes': stats['total_bytes'],
200
'event': 'failed'
201
})
202
else:
203
self.logger.info(f"Transfer completed: {transfer_id}", extra={
204
'transfer_id': transfer_id,
205
'total_bytes': stats['total_bytes'],
206
'progress_events': stats['progress_events'],
207
'event': 'completed'
208
})
209
210
# Clean up stats to prevent memory leaks
211
del self.transfer_stats[transfer_id]
212
213
# Configure logging
214
logging.basicConfig(
215
level=logging.INFO,
216
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
217
)
218
219
# Use the logger
220
client = boto3.client('s3')
221
transfer_manager = TransferManager(client)
222
223
try:
224
event_logger = TransferEventLogger()
225
226
# Multiple transfers with comprehensive logging
227
files_to_upload = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
228
futures = []
229
230
for filename in files_to_upload:
231
with open(filename, 'rb') as f:
232
future = transfer_manager.upload(
233
f, 'my-bucket', os.path.basename(filename),
234
subscribers=[event_logger]
235
)
236
futures.append(future)
237
238
# Wait for all transfers
239
for future in futures:
240
try:
241
future.result()
242
except Exception as e:
243
print(f"Transfer failed: {e}")
244
245
finally:
246
transfer_manager.shutdown()
247
```
248
249
### Custom Metrics Collector
250
251
```python
252
import threading
253
from collections import defaultdict, deque
254
from datetime import datetime, timedelta
255
256
class MetricsCollector(BaseSubscriber):
257
"""Collects detailed metrics about transfer performance."""
258
259
def __init__(self, window_seconds=300): # 5-minute window
260
self.window_seconds = window_seconds
261
self.lock = threading.Lock()
262
263
# Metrics storage
264
self.transfer_metrics = defaultdict(dict)
265
self.throughput_samples = deque()
266
self.error_counts = defaultdict(int)
267
self.completion_times = deque()
268
269
# Running statistics
270
self.total_transfers = 0
271
self.successful_transfers = 0
272
self.total_bytes_transferred = 0
273
274
def on_queued(self, **kwargs):
275
transfer_id = kwargs.get('transfer_id', 'unknown')
276
277
with self.lock:
278
self.transfer_metrics[transfer_id] = {
279
'queued_time': datetime.now(),
280
'bytes_transferred': 0,
281
'progress_count': 0
282
}
283
self.total_transfers += 1
284
285
def on_progress(self, bytes_transferred, **kwargs):
286
transfer_id = kwargs.get('transfer_id', 'unknown')
287
current_time = datetime.now()
288
289
with self.lock:
290
if transfer_id in self.transfer_metrics:
291
metrics = self.transfer_metrics[transfer_id]
292
metrics['bytes_transferred'] += bytes_transferred
293
metrics['progress_count'] += 1
294
metrics['last_progress'] = current_time
295
296
# Add throughput sample
297
self.throughput_samples.append((current_time, bytes_transferred))
298
self._cleanup_old_samples(current_time)
299
300
def on_done(self, **kwargs):
301
transfer_id = kwargs.get('transfer_id', 'unknown')
302
exception = kwargs.get('exception')
303
current_time = datetime.now()
304
305
with self.lock:
306
if transfer_id in self.transfer_metrics:
307
metrics = self.transfer_metrics[transfer_id]
308
metrics['completed_time'] = current_time
309
310
if exception:
311
error_type = type(exception).__name__
312
self.error_counts[error_type] += 1
313
metrics['error'] = str(exception)
314
else:
315
self.successful_transfers += 1
316
self.total_bytes_transferred += metrics['bytes_transferred']
317
318
# Calculate transfer duration
319
duration = current_time - metrics['queued_time']
320
self.completion_times.append((current_time, duration.total_seconds()))
321
322
# Clean up old completion times
323
self._cleanup_old_completions(current_time)
324
325
# Clean up transfer metrics
326
del self.transfer_metrics[transfer_id]
327
328
def _cleanup_old_samples(self, current_time):
329
"""Remove throughput samples older than window."""
330
cutoff = current_time - timedelta(seconds=self.window_seconds)
331
while self.throughput_samples and self.throughput_samples[0][0] < cutoff:
332
self.throughput_samples.popleft()
333
334
def _cleanup_old_completions(self, current_time):
335
"""Remove completion times older than window."""
336
cutoff = current_time - timedelta(seconds=self.window_seconds)
337
while self.completion_times and self.completion_times[0][0] < cutoff:
338
self.completion_times.popleft()
339
340
def get_current_throughput(self):
341
"""Get current throughput in bytes per second."""
342
with self.lock:
343
if not self.throughput_samples:
344
return 0.0
345
346
total_bytes = sum(sample[1] for sample in self.throughput_samples)
347
time_span = (self.throughput_samples[-1][0] - self.throughput_samples[0][0]).total_seconds()
348
349
return total_bytes / time_span if time_span > 0 else 0.0
350
351
def get_average_completion_time(self):
352
"""Get average completion time in seconds."""
353
with self.lock:
354
if not self.completion_times:
355
return 0.0
356
357
return sum(ct[1] for ct in self.completion_times) / len(self.completion_times)
358
359
def get_success_rate(self):
360
"""Get success rate as percentage."""
361
with self.lock:
362
if self.total_transfers == 0:
363
return 0.0
364
return (self.successful_transfers / self.total_transfers) * 100
365
366
def get_error_summary(self):
367
"""Get error counts by type."""
368
with self.lock:
369
return dict(self.error_counts)
370
371
def print_metrics(self):
372
"""Print current metrics summary."""
373
throughput = self.get_current_throughput()
374
avg_completion = self.get_average_completion_time()
375
success_rate = self.get_success_rate()
376
errors = self.get_error_summary()
377
378
print("\n=== Transfer Metrics ===")
379
print(f"Total transfers: {self.total_transfers}")
380
print(f"Successful transfers: {self.successful_transfers}")
381
print(f"Success rate: {success_rate:.1f}%")
382
print(f"Current throughput: {throughput / 1024:.1f} KB/s")
383
print(f"Average completion time: {avg_completion:.2f} seconds")
384
print(f"Total bytes transferred: {self.total_bytes_transferred / (1024*1024):.1f} MB")
385
386
if errors:
387
print("Error summary:")
388
for error_type, count in errors.items():
389
print(f" {error_type}: {count}")
390
print("========================\n")
391
392
# Use the metrics collector
393
import time
394
395
client = boto3.client('s3')
396
transfer_manager = TransferManager(client)
397
398
try:
399
metrics = MetricsCollector()
400
401
# Start multiple transfers with metrics collection
402
futures = []
403
for i in range(10):
404
filename = f'/tmp/test_file_{i}.txt'
405
# Create test file
406
with open(filename, 'w') as f:
407
f.write('x' * (1024 * (i + 1))) # Files of increasing size
408
409
with open(filename, 'rb') as f:
410
future = transfer_manager.upload(
411
f, 'my-bucket', f'test_file_{i}.txt',
412
subscribers=[metrics]
413
)
414
futures.append(future)
415
416
# Monitor progress and print metrics periodically
417
completed = 0
418
while completed < len(futures):
419
time.sleep(2)
420
421
# Check for completed transfers
422
for future in futures:
423
if future.done() and not hasattr(future, '_processed'):
424
completed += 1
425
future._processed = True
426
427
# Print current metrics
428
metrics.print_metrics()
429
430
# Final metrics
431
print("Final metrics:")
432
metrics.print_metrics()
433
434
finally:
435
transfer_manager.shutdown()
436
```
437
438
### Conditional Event Handling
439
440
```python
441
class ConditionalSubscriber(BaseSubscriber):
442
"""Subscriber that handles events based on specific conditions."""
443
444
def __init__(self, conditions=None):
445
self.conditions = conditions or {}
446
self.matched_transfers = set()
447
448
def _matches_conditions(self, **kwargs):
449
"""Check if event matches specified conditions."""
450
if not self.conditions:
451
return True
452
453
call_args = kwargs.get('call_args')
454
if not call_args:
455
return False
456
457
# Check bucket condition
458
if 'bucket' in self.conditions:
459
if getattr(call_args, 'bucket', None) != self.conditions['bucket']:
460
return False
461
462
# Check key pattern condition
463
if 'key_pattern' in self.conditions:
464
key = getattr(call_args, 'key', '')
465
if self.conditions['key_pattern'] not in key:
466
return False
467
468
# Check minimum size condition
469
if 'min_size' in self.conditions:
470
transfer_size = kwargs.get('transfer_size', 0)
471
if transfer_size < self.conditions['min_size']:
472
return False
473
474
return True
475
476
def on_queued(self, **kwargs):
477
if self._matches_conditions(**kwargs):
478
transfer_id = kwargs.get('transfer_id')
479
self.matched_transfers.add(transfer_id)
480
print(f"Monitoring transfer: {transfer_id}")
481
482
def on_progress(self, bytes_transferred, **kwargs):
483
transfer_id = kwargs.get('transfer_id')
484
if transfer_id in self.matched_transfers:
485
total_transferred = kwargs.get('total_bytes_transferred', bytes_transferred)
486
print(f"Progress for {transfer_id}: {total_transferred} bytes")
487
488
def on_done(self, **kwargs):
489
transfer_id = kwargs.get('transfer_id')
490
if transfer_id in self.matched_transfers:
491
exception = kwargs.get('exception')
492
if exception:
493
print(f"Monitored transfer failed: {transfer_id} - {exception}")
494
else:
495
print(f"Monitored transfer completed: {transfer_id}")
496
497
self.matched_transfers.discard(transfer_id)
498
499
# Use conditional subscriber
500
conditions = {
501
'bucket': 'important-bucket',
502
'key_pattern': 'critical/',
503
'min_size': 10 * 1024 * 1024 # Only files > 10MB
504
}
505
506
conditional_sub = ConditionalSubscriber(conditions)
507
508
# This transfer will be monitored (if it meets conditions)
509
with open('/tmp/critical_large_file.dat', 'rb') as f:
510
future = transfer_manager.upload(
511
f, 'important-bucket', 'critical/large_file.dat',
512
subscribers=[conditional_sub]
513
)
514
future.result()
515
```
516
517
### Multi-Subscriber Coordination
518
519
```python
520
class SubscriberCoordinator:
521
"""Coordinates multiple subscribers for complex event handling."""
522
523
def __init__(self, subscribers=None):
524
self.subscribers = subscribers or []
525
self.global_stats = {
526
'total_queued': 0,
527
'total_completed': 0,
528
'total_failed': 0,
529
'total_bytes': 0
530
}
531
532
def add_subscriber(self, subscriber):
533
"""Add a subscriber to the coordination."""
534
self.subscribers.append(subscriber)
535
536
def create_coordinated_subscriber(self):
537
"""Create a subscriber that coordinates with all registered subscribers."""
538
539
class CoordinatedSubscriber(BaseSubscriber):
540
def __init__(self, coordinator):
541
self.coordinator = coordinator
542
543
def on_queued(self, **kwargs):
544
self.coordinator.global_stats['total_queued'] += 1
545
for sub in self.coordinator.subscribers:
546
try:
547
sub.on_queued(**kwargs)
548
except Exception as e:
549
print(f"Subscriber error in on_queued: {e}")
550
551
def on_progress(self, bytes_transferred, **kwargs):
552
self.coordinator.global_stats['total_bytes'] += bytes_transferred
553
for sub in self.coordinator.subscribers:
554
try:
555
sub.on_progress(bytes_transferred, **kwargs)
556
except Exception as e:
557
print(f"Subscriber error in on_progress: {e}")
558
559
def on_done(self, **kwargs):
560
exception = kwargs.get('exception')
561
if exception:
562
self.coordinator.global_stats['total_failed'] += 1
563
else:
564
self.coordinator.global_stats['total_completed'] += 1
565
566
for sub in self.coordinator.subscribers:
567
try:
568
sub.on_done(**kwargs)
569
except Exception as e:
570
print(f"Subscriber error in on_done: {e}")
571
572
return CoordinatedSubscriber(self)
573
574
def print_global_stats(self):
575
"""Print global statistics across all subscribers."""
576
stats = self.global_stats
577
print(f"\nGlobal Stats:")
578
print(f" Queued: {stats['total_queued']}")
579
print(f" Completed: {stats['total_completed']}")
580
print(f" Failed: {stats['total_failed']}")
581
print(f" Total bytes: {stats['total_bytes'] / (1024*1024):.1f} MB")
582
583
# Use subscriber coordination
584
coordinator = SubscriberCoordinator()
585
586
# Add multiple subscribers
587
coordinator.add_subscriber(ProgressSubscriber("Global Progress"))
588
coordinator.add_subscriber(TransferEventLogger())
589
coordinator.add_subscriber(MetricsCollector())
590
591
# Create coordinated subscriber
592
coordinated_sub = coordinator.create_coordinated_subscriber()
593
594
# Use with transfers
595
with open('/tmp/test_file.dat', 'rb') as f:
596
future = transfer_manager.upload(
597
f, 'my-bucket', 'test_file.dat',
598
subscribers=[coordinated_sub]
599
)
600
future.result()
601
602
coordinator.print_global_stats()
603
```
604
605
## Event Information Reference
606
607
### on_queued Event Parameters
608
609
- `transfer_id`: Unique identifier for the transfer
610
- `call_args`: Original method call arguments (bucket, key, etc.)
611
- `user_context`: User-defined context dictionary
612
613
### on_progress Event Parameters
614
615
- `bytes_transferred`: Bytes transferred in this progress event
616
- `total_bytes_transferred`: Total bytes transferred so far (optional)
617
- `transfer_size`: Total transfer size if known (optional)
618
- `transfer_id`: Unique identifier for the transfer
619
620
### on_done Event Parameters
621
622
- `transfer_id`: Unique identifier for the transfer
623
- `exception`: Exception object if transfer failed (None if successful)
624
- `result`: Transfer result object
625
- `call_args`: Original method call arguments
626
627
## Best Practices
628
629
### Subscriber Implementation
630
631
1. **Handle exceptions**: Wrap subscriber code in try/catch blocks
632
2. **Avoid blocking operations**: Keep callback methods fast and non-blocking
633
3. **Use threading safely**: Protect shared data with locks in multi-threaded environments
634
4. **Clean up resources**: Remove references to completed transfers to prevent memory leaks
635
636
### Performance Considerations
637
638
1. **Limit logging frequency**: Avoid logging every progress event for large transfers
639
2. **Use efficient data structures**: Choose appropriate containers for metrics storage
640
3. **Batch operations**: Group multiple events for processing efficiency
641
4. **Monitor memory usage**: Clean up old data periodically
642
643
### Error Handling
644
645
1. **Graceful degradation**: Subscriber failures shouldn't affect transfers
646
2. **Log subscriber errors**: Report subscriber exceptions for debugging
647
3. **Validate event data**: Check for required fields before processing
648
4. **Implement fallbacks**: Provide default behavior when subscribers fail