0
# Diagnostics
1
2
Tools for monitoring performance, resource usage, and debugging distributed computations. Dask's diagnostic tools help profile execution, track resource consumption, and identify bottlenecks in parallel workflows.
3
4
## Capabilities
5
6
### Progress Tracking
7
8
Monitor computation progress with visual progress bars and reporting.
9
10
```python { .api }
11
class ProgressBar:
12
"""
13
Display computation progress with visual progress bar.
14
15
Shows task completion status during compute() operations.
16
Works in Jupyter notebooks, terminal, and web interfaces.
17
"""
18
19
def __init__(self, minimum=1.0, dt=0.1):
20
"""
21
Initialize progress bar.
22
23
Parameters:
24
- minimum: Minimum time (seconds) before showing progress
25
- dt: Update interval (seconds)
26
"""
27
28
def __enter__(self):
29
"""Start progress tracking."""
30
return self
31
32
def __exit__(self, *args):
33
"""Stop progress tracking."""
34
pass
35
36
def register(self):
37
"""Register as global progress callback."""
38
pass
39
40
def unregister(self):
41
"""Unregister progress callback."""
42
pass
43
```
44
45
### Performance Profiling
46
47
Profile task execution times and identify performance bottlenecks.
48
49
```python { .api }
50
class Profiler:
51
"""
52
Profile task execution times and function calls.
53
54
Tracks time spent in each task and function to identify
55
performance bottlenecks and optimization opportunities.
56
"""
57
58
def __init__(self):
59
"""Initialize profiler."""
60
pass
61
62
def __enter__(self):
63
"""Start profiling."""
64
return self
65
66
def __exit__(self, *args):
67
"""Stop profiling and collect results."""
68
pass
69
70
def results(self):
71
"""
72
Get profiling results.
73
74
Returns:
75
list: List of profiling records with timing information
76
"""
77
pass
78
79
def visualize(self, filename=None, **kwargs):
80
"""
81
Visualize profiling results.
82
83
Parameters:
84
- filename: Output file for visualization
85
- **kwargs: Additional visualization options
86
87
Returns:
88
Visualization object or saves to file
89
"""
90
pass
91
92
def clear(self):
93
"""Clear collected profiling data."""
94
pass
95
```
96
97
### Resource Monitoring
98
99
Monitor system resource usage during computations.
100
101
```python { .api }
102
class ResourceProfiler:
103
"""
104
Monitor system resource usage (CPU, memory, network, disk).
105
106
Tracks resource consumption over time to identify
107
resource bottlenecks and optimize resource allocation.
108
"""
109
110
def __init__(self, dt=1.0):
111
"""
112
Initialize resource profiler.
113
114
Parameters:
115
- dt: Sampling interval (seconds)
116
"""
117
pass
118
119
def __enter__(self):
120
"""Start resource monitoring."""
121
return self
122
123
def __exit__(self, *args):
124
"""Stop resource monitoring."""
125
pass
126
127
def results(self):
128
"""
129
Get resource usage results.
130
131
Returns:
132
list: Resource usage data over time
133
"""
134
pass
135
136
def visualize(self, filename=None, **kwargs):
137
"""
138
Visualize resource usage.
139
140
Parameters:
141
- filename: Output file for plots
142
- **kwargs: Plotting options
143
144
Returns:
145
Resource usage plots
146
"""
147
pass
148
149
def clear(self):
150
"""Clear collected resource data."""
151
pass
152
```
153
154
### Cache Profiling
155
156
Monitor task graph caching and optimization effectiveness.
157
158
```python { .api }
159
class CacheProfiler:
160
"""
161
Monitor task caching and graph optimization.
162
163
Tracks cache hits/misses and optimization statistics
164
to tune caching strategies and graph optimizations.
165
"""
166
167
def __init__(self):
168
"""Initialize cache profiler."""
169
pass
170
171
def __enter__(self):
172
"""Start cache monitoring."""
173
return self
174
175
def __exit__(self, *args):
176
"""Stop cache monitoring."""
177
pass
178
179
def results(self):
180
"""
181
Get cache statistics.
182
183
Returns:
184
dict: Cache hit/miss ratios and optimization stats
185
"""
186
pass
187
```
188
189
### Custom Callbacks
190
191
Create custom diagnostic callbacks for specialized monitoring.
192
193
```python { .api }
194
class Callback:
195
"""
196
Base class for custom diagnostic callbacks.
197
198
Subclass this to create custom monitoring and
199
diagnostic tools for specific use cases.
200
"""
201
202
def __init__(self):
203
"""Initialize callback."""
204
pass
205
206
def start(self, dsk):
207
"""
208
Called when computation starts.
209
210
Parameters:
211
- dsk: Task graph dictionary
212
"""
213
pass
214
215
def start_task(self, key, task, **kwargs):
216
"""
217
Called when task starts.
218
219
Parameters:
220
- key: Task key
221
- task: Task tuple
222
- **kwargs: Additional task information
223
"""
224
pass
225
226
def finish_task(self, key, task, **kwargs):
227
"""
228
Called when task finishes.
229
230
Parameters:
231
- key: Task key
232
- task: Task tuple
233
- **kwargs: Task results and timing
234
"""
235
pass
236
237
def finish(self, dsk, state, errored):
238
"""
239
Called when computation finishes.
240
241
Parameters:
242
- dsk: Task graph dictionary
243
- state: Final computation state
244
- errored: Whether computation had errors
245
"""
246
pass
247
```
248
249
### Visualization Tools
250
251
Visualize computation graphs, profiling results, and resource usage.
252
253
```python { .api }
254
def visualize(*args, filename=None, format=None, optimize_graph=False,
255
color='order', **kwargs):
256
"""
257
Visualize task graphs and diagnostic information.
258
259
Parameters:
260
- *args: Collections or profiling results to visualize
261
- filename: Output file path
262
- format: Output format ('png', 'svg', 'pdf', etc.)
263
- optimize_graph: Whether to optimize graph before visualization
264
- color: Node coloring scheme
265
- **kwargs: Additional graphviz options
266
267
Returns:
268
Graphviz object or None if filename specified
269
"""
270
pass
271
```
272
273
### Memory Diagnostics
274
275
Monitor memory usage and identify memory leaks.
276
277
```python { .api }
278
def memory_usage(func, *args, **kwargs):
279
"""
280
Measure memory usage of function execution.
281
282
Parameters:
283
- func: Function to monitor
284
- *args: Function arguments
285
- **kwargs: Function keyword arguments
286
287
Returns:
288
tuple: (result, peak_memory_mb)
289
"""
290
pass
291
292
def sizeof(obj):
293
"""
294
Estimate memory size of object.
295
296
Parameters:
297
- obj: Object to measure
298
299
Returns:
300
int: Size in bytes
301
"""
302
pass
303
```
304
305
## Usage Examples
306
307
### Basic Progress Monitoring
308
309
```python
310
import dask.array as da
311
from dask.diagnostics import ProgressBar
312
313
# Create computation
314
x = da.random.random((10000, 10000), chunks=(1000, 1000))
315
computation = (x + x.T).sum()
316
317
# Monitor progress
318
with ProgressBar():
319
result = computation.compute()
320
321
# Or register globally
322
progress = ProgressBar()
323
progress.register()
324
325
# All computations now show progress
326
result1 = x.mean().compute()
327
result2 = x.std().compute()
328
329
progress.unregister()
330
```
331
332
### Performance Profiling
333
334
```python
335
import dask.array as da
336
from dask.diagnostics import Profiler
337
import numpy as np
338
339
# Create complex computation
340
x = da.random.random((5000, 5000), chunks=(500, 500))
341
y = da.random.random((5000, 5000), chunks=(500, 500))
342
343
computation = ((x + y) @ (x - y)).sum(axis=0)
344
345
# Profile execution
346
with Profiler() as prof:
347
result = computation.compute()
348
349
# Analyze results
350
profile_data = prof.results()
351
print(f"Total tasks: {len(profile_data)}")
352
print(f"Total time: {sum(p.end_time - p.start_time for p in profile_data):.2f}s")
353
354
# Visualize profiling results
355
prof.visualize(filename='profile.html')
356
357
# Find slowest tasks
358
slowest = sorted(profile_data,
359
key=lambda p: p.end_time - p.start_time,
360
reverse=True)[:10]
361
for p in slowest:
362
print(f"Task {p.key}: {p.end_time - p.start_time:.3f}s")
363
```
364
365
### Resource Monitoring
366
367
```python
368
import dask.array as da
369
from dask.diagnostics import ResourceProfiler
370
import time
371
372
# Create memory-intensive computation
373
x = da.random.random((20000, 20000), chunks=(2000, 2000))
374
computation = x.rechunk((1000, 1000)).sum()
375
376
# Monitor resources
377
with ResourceProfiler(dt=0.5) as rprof:
378
result = computation.compute()
379
380
# Analyze resource usage
381
resources = rprof.results()
382
print(f"Peak memory: {max(r.memory for r in resources):.1f} MB")
383
print(f"Peak CPU: {max(r.cpu for r in resources):.1f}%")
384
385
# Visualize resource usage over time
386
rprof.visualize(filename='resources.html')
387
388
# Check for resource bottlenecks
389
high_memory = [r for r in resources if r.memory > 1000] # MB
390
if high_memory:
391
print(f"High memory usage detected at {len(high_memory)} time points")
392
```
393
394
### Combined Diagnostics
395
396
```python
397
import dask.array as da
398
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler
399
400
# Complex computation pipeline
401
def create_computation():
402
x = da.random.random((10000, 10000), chunks=(1000, 1000))
403
y = da.random.random((10000, 10000), chunks=(1000, 1000))
404
405
# Multi-step computation
406
step1 = (x + y).rechunk((500, 500))
407
step2 = step1 @ step1.T
408
step3 = step2.sum(axis=0)
409
return step3
410
411
computation = create_computation()
412
413
# Monitor with all diagnostics
414
with ProgressBar(), Profiler() as prof, ResourceProfiler() as rprof:
415
result = computation.compute()
416
417
# Combined analysis
418
print("=== Performance Analysis ===")
419
profile_data = prof.results()
420
print(f"Total tasks executed: {len(profile_data)}")
421
print(f"Total execution time: {sum(p.end_time - p.start_time for p in profile_data):.2f}s")
422
423
print("\n=== Resource Analysis ===")
424
resources = rprof.results()
425
print(f"Peak memory usage: {max(r.memory for r in resources):.1f} MB")
426
print(f"Average CPU usage: {sum(r.cpu for r in resources) / len(resources):.1f}%")
427
428
# Save detailed reports
429
prof.visualize(filename='performance_profile.html')
430
rprof.visualize(filename='resource_usage.html')
431
```
432
433
### Custom Diagnostic Callback
434
435
```python
436
import dask.array as da
437
from dask.diagnostics import Callback
438
import time
439
440
class TaskLogger(Callback):
441
"""Custom callback to log task execution details."""
442
443
def __init__(self):
444
super().__init__()
445
self.task_times = {}
446
self.start_time = None
447
448
def start(self, dsk):
449
self.start_time = time.time()
450
print(f"Starting computation with {len(dsk)} tasks")
451
452
def start_task(self, key, task, **kwargs):
453
self.task_times[key] = time.time()
454
print(f"Starting task: {key}")
455
456
def finish_task(self, key, task, **kwargs):
457
duration = time.time() - self.task_times[key]
458
print(f"Completed task {key} in {duration:.3f}s")
459
460
def finish(self, dsk, state, errored):
461
total_time = time.time() - self.start_time
462
print(f"Computation finished in {total_time:.2f}s")
463
if errored:
464
print("Computation had errors!")
465
466
# Use custom callback
467
x = da.random.random((1000, 1000), chunks=(200, 200))
468
computation = x.sum()
469
470
with TaskLogger():
471
result = computation.compute()
472
```
473
474
### Memory Usage Analysis
475
476
```python
477
import dask.array as da
478
from dask.diagnostics import ResourceProfiler
479
import psutil
480
import os
481
482
def analyze_memory_usage():
483
"""Analyze memory usage during computation."""
484
485
# Get initial memory
486
process = psutil.Process(os.getpid())
487
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
488
489
# Create large computation
490
x = da.random.random((15000, 15000), chunks=(1500, 1500))
491
computation = x.rechunk((3000, 3000)).sum()
492
493
with ResourceProfiler(dt=0.2) as rprof:
494
result = computation.compute()
495
496
# Analyze memory pattern
497
resources = rprof.results()
498
memory_usage = [r.memory for r in resources]
499
500
peak_memory = max(memory_usage)
501
avg_memory = sum(memory_usage) / len(memory_usage)
502
final_memory = process.memory_info().rss / 1024 / 1024
503
504
print(f"Initial memory: {initial_memory:.1f} MB")
505
print(f"Peak memory during computation: {peak_memory:.1f} MB")
506
print(f"Average memory during computation: {avg_memory:.1f} MB")
507
print(f"Final memory: {final_memory:.1f} MB")
508
print(f"Memory increase: {final_memory - initial_memory:.1f} MB")
509
510
# Check for memory leaks
511
if final_memory > initial_memory + 100: # 100MB threshold
512
print("⚠️ Potential memory leak detected!")
513
514
return resources
515
516
memory_data = analyze_memory_usage()
517
```
518
519
### Distributed Diagnostics
520
521
```python
522
import dask.array as da
523
from dask.distributed import Client
524
from dask.diagnostics import ProgressBar
525
526
# Connect to distributed cluster
527
client = Client('scheduler-address:8786')
528
529
# Monitor distributed computation
530
x = da.random.random((50000, 50000), chunks=(5000, 5000))
531
computation = (x + x.T).sum()
532
533
# Progress works with distributed scheduler
534
with ProgressBar():
535
result = computation.compute()
536
537
# Access cluster diagnostics
538
print(f"Dashboard: {client.dashboard_link}")
539
print(f"Scheduler info: {client.scheduler_info()}")
540
541
# Worker resource usage
542
worker_info = client.scheduler_info()['workers']
543
for worker_addr, info in worker_info.items():
544
print(f"Worker {worker_addr}:")
545
print(f" CPU cores: {info.get('ncores', 'unknown')}")
546
print(f" Memory: {info.get('memory_limit', 'unknown')} bytes")
547
print(f" Tasks: {info.get('nthreads', 'unknown')} threads")
548
549
client.close()
550
```