or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching.mdcallbacks.mdcompression.mdcore-operations.mdfilesystem-interface.mdindex.mdmapping.mdregistry.mdutilities.md

callbacks.mddocs/

0

# Progress Callbacks

1

2

Extensible callback system for monitoring file transfer progress, supporting both built-in progress indicators and custom callback implementations. Callbacks provide real-time feedback during long-running filesystem operations like file uploads, downloads, and bulk transfers.

3

4

## Capabilities

5

6

### Base Callback Class

7

8

Foundation class for all progress callback implementations with standard progress tracking interface.

9

10

```python { .api }

11

class Callback:

12

"""Base class for progress callbacks."""

13

14

def __init__(self, size=None, value=0, hooks=None, **kwargs):

15

"""

16

Initialize callback.

17

18

Parameters:

19

- size: int, total size/count for progress tracking

20

- value: int, initial progress value

21

- hooks: dict, event hooks for specific progress events

22

- **kwargs: additional callback-specific options

23

"""

24

25

def __call__(self, size_or_none=None, value_or_none=None):

26

"""

27

Update progress.

28

29

Parameters:

30

- size_or_none: int or None, set total size if provided

31

- value_or_none: int or None, set current progress if provided

32

33

Returns:

34

bool, True to continue operation, False to abort

35

"""

36

37

def __enter__(self):

38

"""

39

Context manager entry.

40

41

Returns:

42

Callback, self

43

"""

44

45

def __exit__(self, *args):

46

"""Context manager exit."""

47

48

def set_size(self, size):

49

"""

50

Set total size for progress tracking.

51

52

Parameters:

53

- size: int, total size/count

54

"""

55

56

def absolute_update(self, value):

57

"""

58

Set absolute progress value.

59

60

Parameters:

61

- value: int, current progress value

62

"""

63

64

def relative_update(self, inc=1):

65

"""

66

Update progress incrementally.

67

68

Parameters:

69

- inc: int, increment amount

70

"""

71

72

def branched(self, path_1, path_2, **kwargs):

73

"""

74

Create branched callback for parallel operations.

75

76

Parameters:

77

- path_1: str, first operation path

78

- path_2: str, second operation path

79

- **kwargs: additional options

80

81

Returns:

82

Callback, new callback instance for branched operation

83

"""

84

85

def close(self):

86

"""Close callback and clean up resources."""

87

```

88

89

### Built-in Callback Implementations

90

91

Ready-to-use callback implementations for common progress display needs.

92

93

```python { .api }

94

class TqdmCallback(Callback):

95

"""Progress bar callback using tqdm library."""

96

97

def __init__(self, tqdm_kwargs=None, **kwargs):

98

"""

99

Initialize tqdm progress bar callback.

100

101

Parameters:

102

- tqdm_kwargs: dict, options passed to tqdm constructor

103

- **kwargs: additional callback options

104

"""

105

106

class DotPrinterCallback(Callback):

107

"""Simple dot-printing progress callback."""

108

109

def __init__(self, **kwargs):

110

"""Initialize dot printer callback."""

111

112

class NoOpCallback(Callback):

113

"""No-operation callback that does nothing."""

114

115

def __init__(self, **kwargs):

116

"""Initialize no-op callback."""

117

```

118

119

### Default Callback Instance

120

121

Pre-configured default callback instance for immediate use.

122

123

```python { .api }

124

DEFAULT_CALLBACK: Callback

125

"""Default callback instance (NoOpCallback)"""

126

```

127

128

## Usage Patterns

129

130

### Basic Progress Monitoring

131

132

```python

133

# Using default tqdm progress bar

134

callback = fsspec.callbacks.TqdmCallback()

135

136

# Download with progress bar

137

fs = fsspec.filesystem('s3')

138

fs.get('bucket/large-file.dat', 'local-file.dat', callback=callback)

139

140

# Upload with progress bar

141

fs.put('large-local-file.dat', 'bucket/uploaded-file.dat', callback=callback)

142

```

143

144

### Context Manager Usage

145

146

```python

147

# Progress bar automatically closes when done

148

with fsspec.callbacks.TqdmCallback() as callback:

149

fs.get('bucket/large-file.dat', 'local-file.dat', callback=callback)

150

# Progress bar disappears after completion

151

```

152

153

### Custom Progress Display

154

155

```python

156

class CustomCallback(fsspec.Callback):

157

def __init__(self):

158

super().__init__()

159

self.start_time = time.time()

160

161

def __call__(self, size_or_none=None, value_or_none=None):

162

if size_or_none is not None:

163

self.size = size_or_none

164

if value_or_none is not None:

165

self.value = value_or_none

166

167

if hasattr(self, 'size') and hasattr(self, 'value'):

168

percent = (self.value / self.size) * 100

169

elapsed = time.time() - self.start_time

170

rate = self.value / elapsed if elapsed > 0 else 0

171

172

print(f"\rProgress: {percent:.1f}% ({self.value}/{self.size}) "

173

f"Rate: {rate:.1f} bytes/sec", end='')

174

175

return True # Continue operation

176

177

# Use custom callback

178

callback = CustomCallback()

179

fs.get('bucket/file.dat', 'local.dat', callback=callback)

180

```

181

182

### Multi-Operation Callbacks

183

184

```python

185

import fsspec

186

187

# Callback for multiple file operations

188

callback = fsspec.callbacks.TqdmCallback()

189

190

files_to_download = [

191

('bucket/file1.dat', 'local1.dat'),

192

('bucket/file2.dat', 'local2.dat'),

193

('bucket/file3.dat', 'local3.dat'),

194

]

195

196

# Set total size for all files

197

total_size = sum(fs.size(remote) for remote, local in files_to_download)

198

callback.set_size(total_size)

199

200

# Download files with cumulative progress

201

for remote, local in files_to_download:

202

fs.get(remote, local, callback=callback)

203

```

204

205

### Branched Callbacks for Parallel Operations

206

207

```python

208

# Main callback for overall progress

209

main_callback = fsspec.callbacks.TqdmCallback()

210

211

# Create branched callbacks for parallel operations

212

branch1 = main_callback.branched('operation1', 'operation2')

213

branch2 = main_callback.branched('operation1', 'operation2')

214

215

# Use in parallel operations (conceptual - actual parallelism depends on implementation)

216

import threading

217

218

def download_file(remote, local, callback):

219

fs.get(remote, local, callback=callback)

220

221

thread1 = threading.Thread(target=download_file,

222

args=('bucket/file1.dat', 'local1.dat', branch1))

223

thread2 = threading.Thread(target=download_file,

224

args=('bucket/file2.dat', 'local2.dat', branch2))

225

226

thread1.start()

227

thread2.start()

228

thread1.join()

229

thread2.join()

230

```

231

232

### Cancellation Support

233

234

```python

235

class CancellableCallback(fsspec.Callback):

236

def __init__(self):

237

super().__init__()

238

self.cancelled = False

239

240

def cancel(self):

241

self.cancelled = True

242

243

def __call__(self, size_or_none=None, value_or_none=None):

244

# Update progress

245

if size_or_none is not None:

246

self.size = size_or_none

247

if value_or_none is not None:

248

self.value = value_or_none

249

250

# Check for cancellation

251

if self.cancelled:

252

print("Operation cancelled by user")

253

return False # Abort operation

254

255

# Display progress

256

if hasattr(self, 'size') and hasattr(self, 'value'):

257

percent = (self.value / self.size) * 100

258

print(f"\rProgress: {percent:.1f}%", end='')

259

260

return True # Continue operation

261

262

# Usage with cancellation

263

callback = CancellableCallback()

264

265

# Start operation in background

266

import threading

267

def download_with_callback():

268

try:

269

fs.get('bucket/very-large-file.dat', 'local.dat', callback=callback)

270

print("\nDownload completed")

271

except Exception as e:

272

print(f"\nDownload failed: {e}")

273

274

download_thread = threading.Thread(target=download_with_callback)

275

download_thread.start()

276

277

# Cancel after 30 seconds

278

time.sleep(30)

279

callback.cancel()

280

download_thread.join()

281

```

282

283

### Integration with Different Operations

284

285

```python

286

callback = fsspec.callbacks.TqdmCallback()

287

288

# File copy with progress

289

fs.copy('bucket/source.dat', 'bucket/backup.dat', callback=callback)

290

291

# Directory sync with progress

292

fs.get('bucket/data/', 'local-data/', recursive=True, callback=callback)

293

294

# Bulk upload with progress

295

fs.put('local-data/', 'bucket/uploaded-data/', recursive=True, callback=callback)

296

297

# Multiple file operations

298

files = fs.glob('bucket/data/*.csv')

299

callback.set_size(len(files))

300

301

for i, file_path in enumerate(files):

302

content = fs.cat_file(file_path)

303

process_data(content)

304

callback.absolute_update(i + 1)

305

```

306

307

### Configuring Tqdm Options

308

309

```python

310

# Customize tqdm appearance and behavior

311

tqdm_options = {

312

'unit': 'B',

313

'unit_scale': True,

314

'unit_divisor': 1024,

315

'desc': 'Downloading',

316

'ncols': 80,

317

'ascii': True

318

}

319

320

callback = fsspec.callbacks.TqdmCallback(tqdm_kwargs=tqdm_options)

321

322

# Results in progress bar like:

323

# Downloading: 45%|████▌ | 450MB/1.00GB [00:30<00:25, 15.0MB/s]

324

fs.get('bucket/large-file.dat', 'local.dat', callback=callback)

325

```

326

327

### Logging Progress Information

328

329

```python

330

import logging

331

332

class LoggingCallback(fsspec.Callback):

333

def __init__(self, log_interval=1024*1024): # Log every MB

334

super().__init__()

335

self.log_interval = log_interval

336

self.last_logged = 0

337

self.logger = logging.getLogger(__name__)

338

339

def __call__(self, size_or_none=None, value_or_none=None):

340

if size_or_none is not None:

341

self.size = size_or_none

342

self.logger.info(f"Starting operation, total size: {self.size} bytes")

343

344

if value_or_none is not None:

345

self.value = value_or_none

346

347

# Log at intervals

348

if self.value - self.last_logged >= self.log_interval:

349

if hasattr(self, 'size'):

350

percent = (self.value / self.size) * 100

351

self.logger.info(f"Progress: {percent:.1f}% ({self.value}/{self.size} bytes)")

352

self.last_logged = self.value

353

354

return True

355

356

# Use logging callback

357

logging.basicConfig(level=logging.INFO)

358

callback = LoggingCallback()

359

fs.get('bucket/file.dat', 'local.dat', callback=callback)

360

```

361

362

### Callback Hooks for Events

363

364

```python

365

def on_start(callback):

366

print("Transfer started")

367

368

def on_complete(callback):

369

print("Transfer completed successfully")

370

371

def on_error(callback, error):

372

print(f"Transfer failed: {error}")

373

374

# Callbacks can use hooks for event handling

375

hooks = {

376

'start': on_start,

377

'complete': on_complete,

378

'error': on_error

379

}

380

381

callback = fsspec.Callback(hooks=hooks)

382

# Implementation would call hooks at appropriate times

383

```

384

385

## Callback Guidelines

386

387

### When to Use Callbacks

388

389

- **Long-running operations**: File transfers, directory syncing

390

- **Large files**: Multi-gigabyte uploads/downloads

391

- **Batch operations**: Processing many files

392

- **User interfaces**: Providing feedback in GUI applications

393

- **Monitoring**: Logging progress for automated systems

394

395

### Callback Performance Considerations

396

397

- **Update frequency**: Don't update too frequently (every few KB minimum)

398

- **UI responsiveness**: Keep callback processing lightweight

399

- **Thread safety**: Ensure callbacks are thread-safe for parallel operations

400

- **Resource cleanup**: Always close/cleanup callbacks when done

401

402

### Error Handling in Callbacks

403

404

```python

405

class RobustCallback(fsspec.Callback):

406

def __call__(self, size_or_none=None, value_or_none=None):

407

try:

408

# Update progress display

409

self.update_display(size_or_none, value_or_none)

410

return True

411

except Exception as e:

412

# Log error but don't fail the transfer

413

logging.error(f"Callback error: {e}")

414

return True # Continue operation despite callback error

415

```