0
# Progress Callbacks
1
2
Extensible callback system for monitoring file transfer progress, supporting both built-in progress indicators and custom callback implementations. Callbacks provide real-time feedback during long-running filesystem operations like file uploads, downloads, and bulk transfers.
3
4
## Capabilities
5
6
### Base Callback Class
7
8
Foundation class for all progress callback implementations with standard progress tracking interface.
9
10
```python { .api }
11
class Callback:
12
"""Base class for progress callbacks."""
13
14
def __init__(self, size=None, value=0, hooks=None, **kwargs):
15
"""
16
Initialize callback.
17
18
Parameters:
19
- size: int, total size/count for progress tracking
20
- value: int, initial progress value
21
- hooks: dict, event hooks for specific progress events
22
- **kwargs: additional callback-specific options
23
"""
24
25
def __call__(self, size_or_none=None, value_or_none=None):
26
"""
27
Update progress.
28
29
Parameters:
30
- size_or_none: int or None, set total size if provided
31
- value_or_none: int or None, set current progress if provided
32
33
Returns:
34
bool, True to continue operation, False to abort
35
"""
36
37
def __enter__(self):
38
"""
39
Context manager entry.
40
41
Returns:
42
Callback, self
43
"""
44
45
def __exit__(self, *args):
46
"""Context manager exit."""
47
48
def set_size(self, size):
49
"""
50
Set total size for progress tracking.
51
52
Parameters:
53
- size: int, total size/count
54
"""
55
56
def absolute_update(self, value):
57
"""
58
Set absolute progress value.
59
60
Parameters:
61
- value: int, current progress value
62
"""
63
64
def relative_update(self, inc=1):
65
"""
66
Update progress incrementally.
67
68
Parameters:
69
- inc: int, increment amount
70
"""
71
72
def branched(self, path_1, path_2, **kwargs):
73
"""
74
Create branched callback for parallel operations.
75
76
Parameters:
77
- path_1: str, first operation path
78
- path_2: str, second operation path
79
- **kwargs: additional options
80
81
Returns:
82
Callback, new callback instance for branched operation
83
"""
84
85
def close(self):
86
"""Close callback and clean up resources."""
87
```
88
89
### Built-in Callback Implementations
90
91
Ready-to-use callback implementations for common progress display needs.
92
93
```python { .api }
94
class TqdmCallback(Callback):
95
"""Progress bar callback using tqdm library."""
96
97
def __init__(self, tqdm_kwargs=None, **kwargs):
98
"""
99
Initialize tqdm progress bar callback.
100
101
Parameters:
102
- tqdm_kwargs: dict, options passed to tqdm constructor
103
- **kwargs: additional callback options
104
"""
105
106
class DotPrinterCallback(Callback):
107
"""Simple dot-printing progress callback."""
108
109
def __init__(self, **kwargs):
110
"""Initialize dot printer callback."""
111
112
class NoOpCallback(Callback):
113
"""No-operation callback that does nothing."""
114
115
def __init__(self, **kwargs):
116
"""Initialize no-op callback."""
117
```
118
119
### Default Callback Instance
120
121
Pre-configured default callback instance for immediate use.
122
123
```python { .api }
124
DEFAULT_CALLBACK: Callback
125
"""Default callback instance (NoOpCallback)"""
126
```
127
128
## Usage Patterns
129
130
### Basic Progress Monitoring
131
132
```python
133
# Using default tqdm progress bar
134
callback = fsspec.callbacks.TqdmCallback()
135
136
# Download with progress bar
137
fs = fsspec.filesystem('s3')
138
fs.get('bucket/large-file.dat', 'local-file.dat', callback=callback)
139
140
# Upload with progress bar
141
fs.put('large-local-file.dat', 'bucket/uploaded-file.dat', callback=callback)
142
```
143
144
### Context Manager Usage
145
146
```python
147
# Progress bar automatically closes when done
148
with fsspec.callbacks.TqdmCallback() as callback:
149
fs.get('bucket/large-file.dat', 'local-file.dat', callback=callback)
150
# Progress bar disappears after completion
151
```
152
153
### Custom Progress Display
154
155
```python
156
class CustomCallback(fsspec.Callback):
157
def __init__(self):
158
super().__init__()
159
self.start_time = time.time()
160
161
def __call__(self, size_or_none=None, value_or_none=None):
162
if size_or_none is not None:
163
self.size = size_or_none
164
if value_or_none is not None:
165
self.value = value_or_none
166
167
if hasattr(self, 'size') and hasattr(self, 'value'):
168
percent = (self.value / self.size) * 100
169
elapsed = time.time() - self.start_time
170
rate = self.value / elapsed if elapsed > 0 else 0
171
172
print(f"\rProgress: {percent:.1f}% ({self.value}/{self.size}) "
173
f"Rate: {rate:.1f} bytes/sec", end='')
174
175
return True # Continue operation
176
177
# Use custom callback
178
callback = CustomCallback()
179
fs.get('bucket/file.dat', 'local.dat', callback=callback)
180
```
181
182
### Multi-Operation Callbacks
183
184
```python
185
import fsspec
186
187
# Callback for multiple file operations
188
callback = fsspec.callbacks.TqdmCallback()
189
190
files_to_download = [
191
('bucket/file1.dat', 'local1.dat'),
192
('bucket/file2.dat', 'local2.dat'),
193
('bucket/file3.dat', 'local3.dat'),
194
]
195
196
# Set total size for all files
197
total_size = sum(fs.size(remote) for remote, local in files_to_download)
198
callback.set_size(total_size)
199
200
# Download files with cumulative progress
201
for remote, local in files_to_download:
202
fs.get(remote, local, callback=callback)
203
```
204
205
### Branched Callbacks for Parallel Operations
206
207
```python
208
# Main callback for overall progress
209
main_callback = fsspec.callbacks.TqdmCallback()
210
211
# Create branched callbacks for parallel operations
212
branch1 = main_callback.branched('operation1', 'operation2')
213
branch2 = main_callback.branched('operation1', 'operation2')
214
215
# Use in parallel operations (conceptual - actual parallelism depends on implementation)
216
import threading
217
218
def download_file(remote, local, callback):
219
fs.get(remote, local, callback=callback)
220
221
thread1 = threading.Thread(target=download_file,
222
args=('bucket/file1.dat', 'local1.dat', branch1))
223
thread2 = threading.Thread(target=download_file,
224
args=('bucket/file2.dat', 'local2.dat', branch2))
225
226
thread1.start()
227
thread2.start()
228
thread1.join()
229
thread2.join()
230
```
231
232
### Cancellation Support
233
234
```python
235
class CancellableCallback(fsspec.Callback):
236
def __init__(self):
237
super().__init__()
238
self.cancelled = False
239
240
def cancel(self):
241
self.cancelled = True
242
243
def __call__(self, size_or_none=None, value_or_none=None):
244
# Update progress
245
if size_or_none is not None:
246
self.size = size_or_none
247
if value_or_none is not None:
248
self.value = value_or_none
249
250
# Check for cancellation
251
if self.cancelled:
252
print("Operation cancelled by user")
253
return False # Abort operation
254
255
# Display progress
256
if hasattr(self, 'size') and hasattr(self, 'value'):
257
percent = (self.value / self.size) * 100
258
print(f"\rProgress: {percent:.1f}%", end='')
259
260
return True # Continue operation
261
262
# Usage with cancellation
263
callback = CancellableCallback()
264
265
# Start operation in background
266
import threading
267
def download_with_callback():
268
try:
269
fs.get('bucket/very-large-file.dat', 'local.dat', callback=callback)
270
print("\nDownload completed")
271
except Exception as e:
272
print(f"\nDownload failed: {e}")
273
274
download_thread = threading.Thread(target=download_with_callback)
275
download_thread.start()
276
277
# Cancel after 30 seconds
278
time.sleep(30)
279
callback.cancel()
280
download_thread.join()
281
```
282
283
### Integration with Different Operations
284
285
```python
286
callback = fsspec.callbacks.TqdmCallback()
287
288
# File copy with progress
289
fs.copy('bucket/source.dat', 'bucket/backup.dat', callback=callback)
290
291
# Directory sync with progress
292
fs.get('bucket/data/', 'local-data/', recursive=True, callback=callback)
293
294
# Bulk upload with progress
295
fs.put('local-data/', 'bucket/uploaded-data/', recursive=True, callback=callback)
296
297
# Multiple file operations
298
files = fs.glob('bucket/data/*.csv')
299
callback.set_size(len(files))
300
301
for i, file_path in enumerate(files):
302
content = fs.cat_file(file_path)
303
process_data(content)
304
callback.absolute_update(i + 1)
305
```
306
307
### Configuring Tqdm Options
308
309
```python
310
# Customize tqdm appearance and behavior
311
tqdm_options = {
312
'unit': 'B',
313
'unit_scale': True,
314
'unit_divisor': 1024,
315
'desc': 'Downloading',
316
'ncols': 80,
317
'ascii': True
318
}
319
320
callback = fsspec.callbacks.TqdmCallback(tqdm_kwargs=tqdm_options)
321
322
# Results in progress bar like:
323
# Downloading: 45%|████▌ | 450MB/1.00GB [00:30<00:25, 15.0MB/s]
324
fs.get('bucket/large-file.dat', 'local.dat', callback=callback)
325
```
326
327
### Logging Progress Information
328
329
```python
330
import logging
331
332
class LoggingCallback(fsspec.Callback):
333
def __init__(self, log_interval=1024*1024): # Log every MB
334
super().__init__()
335
self.log_interval = log_interval
336
self.last_logged = 0
337
self.logger = logging.getLogger(__name__)
338
339
def __call__(self, size_or_none=None, value_or_none=None):
340
if size_or_none is not None:
341
self.size = size_or_none
342
self.logger.info(f"Starting operation, total size: {self.size} bytes")
343
344
if value_or_none is not None:
345
self.value = value_or_none
346
347
# Log at intervals
348
if self.value - self.last_logged >= self.log_interval:
349
if hasattr(self, 'size'):
350
percent = (self.value / self.size) * 100
351
self.logger.info(f"Progress: {percent:.1f}% ({self.value}/{self.size} bytes)")
352
self.last_logged = self.value
353
354
return True
355
356
# Use logging callback
357
logging.basicConfig(level=logging.INFO)
358
callback = LoggingCallback()
359
fs.get('bucket/file.dat', 'local.dat', callback=callback)
360
```
361
362
### Callback Hooks for Events
363
364
```python
365
def on_start(callback):
366
print("Transfer started")
367
368
def on_complete(callback):
369
print("Transfer completed successfully")
370
371
def on_error(callback, error):
372
print(f"Transfer failed: {error}")
373
374
# Callbacks can use hooks for event handling
375
hooks = {
376
'start': on_start,
377
'complete': on_complete,
378
'error': on_error
379
}
380
381
callback = fsspec.Callback(hooks=hooks)
382
# Implementation would call hooks at appropriate times
383
```
384
385
## Callback Guidelines
386
387
### When to Use Callbacks
388
389
- **Long-running operations**: File transfers, directory syncing
390
- **Large files**: Multi-gigabyte uploads/downloads
391
- **Batch operations**: Processing many files
392
- **User interfaces**: Providing feedback in GUI applications
393
- **Monitoring**: Logging progress for automated systems
394
395
### Callback Performance Considerations
396
397
- **Update frequency**: Don't update too frequently (every few KB minimum)
398
- **UI responsiveness**: Keep callback processing lightweight
399
- **Thread safety**: Ensure callbacks are thread-safe for parallel operations
400
- **Resource cleanup**: Always close/cleanup callbacks when done
401
402
### Error Handling in Callbacks
403
404
```python
405
class RobustCallback(fsspec.Callback):
406
def __call__(self, size_or_none=None, value_or_none=None):
407
try:
408
# Update progress display
409
self.update_display(size_or_none, value_or_none)
410
return True
411
except Exception as e:
412
# Log error but don't fail the transfer
413
logging.error(f"Callback error: {e}")
414
return True # Continue operation despite callback error
415
```