or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrays.mdbags.mdconfiguration.mdcore-functions.mddataframes.mddelayed.mddiagnostics.mdindex.md

diagnostics.mddocs/

0

# Diagnostics

1

2

Tools for monitoring performance, resource usage, and debugging distributed computations. Dask's diagnostic tools help profile execution, track resource consumption, and identify bottlenecks in parallel workflows.

3

4

## Capabilities

5

6

### Progress Tracking

7

8

Monitor computation progress with visual progress bars and reporting.

9

10

```python { .api }

11

class ProgressBar:

12

"""

13

Display computation progress with visual progress bar.

14

15

Shows task completion status during compute() operations.

16

Works in Jupyter notebooks, terminal, and web interfaces.

17

"""

18

19

def __init__(self, minimum=1.0, dt=0.1):

20

"""

21

Initialize progress bar.

22

23

Parameters:

24

- minimum: Minimum time (seconds) before showing progress

25

- dt: Update interval (seconds)

26

"""

27

28

def __enter__(self):

29

"""Start progress tracking."""

30

return self

31

32

def __exit__(self, *args):

33

"""Stop progress tracking."""

34

pass

35

36

def register(self):

37

"""Register as global progress callback."""

38

pass

39

40

def unregister(self):

41

"""Unregister progress callback."""

42

pass

43

```

44

45

### Performance Profiling

46

47

Profile task execution times and identify performance bottlenecks.

48

49

```python { .api }

50

class Profiler:

51

"""

52

Profile task execution times and function calls.

53

54

Tracks time spent in each task and function to identify

55

performance bottlenecks and optimization opportunities.

56

"""

57

58

def __init__(self):

59

"""Initialize profiler."""

60

pass

61

62

def __enter__(self):

63

"""Start profiling."""

64

return self

65

66

def __exit__(self, *args):

67

"""Stop profiling and collect results."""

68

pass

69

70

def results(self):

71

"""

72

Get profiling results.

73

74

Returns:

75

list: List of profiling records with timing information

76

"""

77

pass

78

79

def visualize(self, filename=None, **kwargs):

80

"""

81

Visualize profiling results.

82

83

Parameters:

84

- filename: Output file for visualization

85

- **kwargs: Additional visualization options

86

87

Returns:

88

Visualization object or saves to file

89

"""

90

pass

91

92

def clear(self):

93

"""Clear collected profiling data."""

94

pass

95

```

96

97

### Resource Monitoring

98

99

Monitor system resource usage during computations.

100

101

```python { .api }

102

class ResourceProfiler:

103

"""

104

Monitor system resource usage (CPU, memory, network, disk).

105

106

Tracks resource consumption over time to identify

107

resource bottlenecks and optimize resource allocation.

108

"""

109

110

def __init__(self, dt=1.0):

111

"""

112

Initialize resource profiler.

113

114

Parameters:

115

- dt: Sampling interval (seconds)

116

"""

117

pass

118

119

def __enter__(self):

120

"""Start resource monitoring."""

121

return self

122

123

def __exit__(self, *args):

124

"""Stop resource monitoring."""

125

pass

126

127

def results(self):

128

"""

129

Get resource usage results.

130

131

Returns:

132

list: Resource usage data over time

133

"""

134

pass

135

136

def visualize(self, filename=None, **kwargs):

137

"""

138

Visualize resource usage.

139

140

Parameters:

141

- filename: Output file for plots

142

- **kwargs: Plotting options

143

144

Returns:

145

Resource usage plots

146

"""

147

pass

148

149

def clear(self):

150

"""Clear collected resource data."""

151

pass

152

```

153

154

### Cache Profiling

155

156

Monitor task graph caching and optimization effectiveness.

157

158

```python { .api }

159

class CacheProfiler:

160

"""

161

Monitor task caching and graph optimization.

162

163

Tracks cache hits/misses and optimization statistics

164

to tune caching strategies and graph optimizations.

165

"""

166

167

def __init__(self):

168

"""Initialize cache profiler."""

169

pass

170

171

def __enter__(self):

172

"""Start cache monitoring."""

173

return self

174

175

def __exit__(self, *args):

176

"""Stop cache monitoring."""

177

pass

178

179

def results(self):

180

"""

181

Get cache statistics.

182

183

Returns:

184

dict: Cache hit/miss ratios and optimization stats

185

"""

186

pass

187

```

188

189

### Custom Callbacks

190

191

Create custom diagnostic callbacks for specialized monitoring.

192

193

```python { .api }

194

class Callback:

195

"""

196

Base class for custom diagnostic callbacks.

197

198

Subclass this to create custom monitoring and

199

diagnostic tools for specific use cases.

200

"""

201

202

def __init__(self):

203

"""Initialize callback."""

204

pass

205

206

def start(self, dsk):

207

"""

208

Called when computation starts.

209

210

Parameters:

211

- dsk: Task graph dictionary

212

"""

213

pass

214

215

def start_task(self, key, task, **kwargs):

216

"""

217

Called when task starts.

218

219

Parameters:

220

- key: Task key

221

- task: Task tuple

222

- **kwargs: Additional task information

223

"""

224

pass

225

226

def finish_task(self, key, task, **kwargs):

227

"""

228

Called when task finishes.

229

230

Parameters:

231

- key: Task key

232

- task: Task tuple

233

- **kwargs: Task results and timing

234

"""

235

pass

236

237

def finish(self, dsk, state, errored):

238

"""

239

Called when computation finishes.

240

241

Parameters:

242

- dsk: Task graph dictionary

243

- state: Final computation state

244

- errored: Whether computation had errors

245

"""

246

pass

247

```

248

249

### Visualization Tools

250

251

Visualize computation graphs, profiling results, and resource usage.

252

253

```python { .api }

254

def visualize(*args, filename=None, format=None, optimize_graph=False,

255

color='order', **kwargs):

256

"""

257

Visualize task graphs and diagnostic information.

258

259

Parameters:

260

- *args: Collections or profiling results to visualize

261

- filename: Output file path

262

- format: Output format ('png', 'svg', 'pdf', etc.)

263

- optimize_graph: Whether to optimize graph before visualization

264

- color: Node coloring scheme

265

- **kwargs: Additional graphviz options

266

267

Returns:

268

Graphviz object or None if filename specified

269

"""

270

pass

271

```

272

273

### Memory Diagnostics

274

275

Monitor memory usage and identify memory leaks.

276

277

```python { .api }

278

def memory_usage(func, *args, **kwargs):

279

"""

280

Measure memory usage of function execution.

281

282

Parameters:

283

- func: Function to monitor

284

- *args: Function arguments

285

- **kwargs: Function keyword arguments

286

287

Returns:

288

tuple: (result, peak_memory_mb)

289

"""

290

pass

291

292

def sizeof(obj):

293

"""

294

Estimate memory size of object.

295

296

Parameters:

297

- obj: Object to measure

298

299

Returns:

300

int: Size in bytes

301

"""

302

pass

303

```

304

305

## Usage Examples

306

307

### Basic Progress Monitoring

308

309

```python

310

import dask.array as da

311

from dask.diagnostics import ProgressBar

312

313

# Create computation

314

x = da.random.random((10000, 10000), chunks=(1000, 1000))

315

computation = (x + x.T).sum()

316

317

# Monitor progress

318

with ProgressBar():

319

result = computation.compute()

320

321

# Or register globally

322

progress = ProgressBar()

323

progress.register()

324

325

# All computations now show progress

326

result1 = x.mean().compute()

327

result2 = x.std().compute()

328

329

progress.unregister()

330

```

331

332

### Performance Profiling

333

334

```python

335

import dask.array as da

336

from dask.diagnostics import Profiler

337

import numpy as np

338

339

# Create complex computation

340

x = da.random.random((5000, 5000), chunks=(500, 500))

341

y = da.random.random((5000, 5000), chunks=(500, 500))

342

343

computation = ((x + y) @ (x - y)).sum(axis=0)

344

345

# Profile execution

346

with Profiler() as prof:

347

result = computation.compute()

348

349

# Analyze results

350

profile_data = prof.results()

351

print(f"Total tasks: {len(profile_data)}")

352

print(f"Total time: {sum(p.end_time - p.start_time for p in profile_data):.2f}s")

353

354

# Visualize profiling results

355

prof.visualize(filename='profile.html')

356

357

# Find slowest tasks

358

slowest = sorted(profile_data,

359

key=lambda p: p.end_time - p.start_time,

360

reverse=True)[:10]

361

for p in slowest:

362

print(f"Task {p.key}: {p.end_time - p.start_time:.3f}s")

363

```

364

365

### Resource Monitoring

366

367

```python

368

import dask.array as da

369

from dask.diagnostics import ResourceProfiler

370

import time

371

372

# Create memory-intensive computation

373

x = da.random.random((20000, 20000), chunks=(2000, 2000))

374

computation = x.rechunk((1000, 1000)).sum()

375

376

# Monitor resources

377

with ResourceProfiler(dt=0.5) as rprof:

378

result = computation.compute()

379

380

# Analyze resource usage

381

resources = rprof.results()

382

print(f"Peak memory: {max(r.memory for r in resources):.1f} MB")

383

print(f"Peak CPU: {max(r.cpu for r in resources):.1f}%")

384

385

# Visualize resource usage over time

386

rprof.visualize(filename='resources.html')

387

388

# Check for resource bottlenecks

389

high_memory = [r for r in resources if r.memory > 1000] # MB

390

if high_memory:

391

print(f"High memory usage detected at {len(high_memory)} time points")

392

```

393

394

### Combined Diagnostics

395

396

```python

397

import dask.array as da

398

from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler

399

400

# Complex computation pipeline

401

def create_computation():

402

x = da.random.random((10000, 10000), chunks=(1000, 1000))

403

y = da.random.random((10000, 10000), chunks=(1000, 1000))

404

405

# Multi-step computation

406

step1 = (x + y).rechunk((500, 500))

407

step2 = step1 @ step1.T

408

step3 = step2.sum(axis=0)

409

return step3

410

411

computation = create_computation()

412

413

# Monitor with all diagnostics

414

with ProgressBar(), Profiler() as prof, ResourceProfiler() as rprof:

415

result = computation.compute()

416

417

# Combined analysis

418

print("=== Performance Analysis ===")

419

profile_data = prof.results()

420

print(f"Total tasks executed: {len(profile_data)}")

421

print(f"Total execution time: {sum(p.end_time - p.start_time for p in profile_data):.2f}s")

422

423

print("\n=== Resource Analysis ===")

424

resources = rprof.results()

425

print(f"Peak memory usage: {max(r.memory for r in resources):.1f} MB")

426

print(f"Average CPU usage: {sum(r.cpu for r in resources) / len(resources):.1f}%")

427

428

# Save detailed reports

429

prof.visualize(filename='performance_profile.html')

430

rprof.visualize(filename='resource_usage.html')

431

```

432

433

### Custom Diagnostic Callback

434

435

```python

436

import dask.array as da

437

from dask.diagnostics import Callback

438

import time

439

440

class TaskLogger(Callback):

441

"""Custom callback to log task execution details."""

442

443

def __init__(self):

444

super().__init__()

445

self.task_times = {}

446

self.start_time = None

447

448

def start(self, dsk):

449

self.start_time = time.time()

450

print(f"Starting computation with {len(dsk)} tasks")

451

452

def start_task(self, key, task, **kwargs):

453

self.task_times[key] = time.time()

454

print(f"Starting task: {key}")

455

456

def finish_task(self, key, task, **kwargs):

457

duration = time.time() - self.task_times[key]

458

print(f"Completed task {key} in {duration:.3f}s")

459

460

def finish(self, dsk, state, errored):

461

total_time = time.time() - self.start_time

462

print(f"Computation finished in {total_time:.2f}s")

463

if errored:

464

print("Computation had errors!")

465

466

# Use custom callback

467

x = da.random.random((1000, 1000), chunks=(200, 200))

468

computation = x.sum()

469

470

with TaskLogger():

471

result = computation.compute()

472

```

473

474

### Memory Usage Analysis

475

476

```python

477

import dask.array as da

478

from dask.diagnostics import ResourceProfiler

479

import psutil

480

import os

481

482

def analyze_memory_usage():

483

"""Analyze memory usage during computation."""

484

485

# Get initial memory

486

process = psutil.Process(os.getpid())

487

initial_memory = process.memory_info().rss / 1024 / 1024 # MB

488

489

# Create large computation

490

x = da.random.random((15000, 15000), chunks=(1500, 1500))

491

computation = x.rechunk((3000, 3000)).sum()

492

493

with ResourceProfiler(dt=0.2) as rprof:

494

result = computation.compute()

495

496

# Analyze memory pattern

497

resources = rprof.results()

498

memory_usage = [r.memory for r in resources]

499

500

peak_memory = max(memory_usage)

501

avg_memory = sum(memory_usage) / len(memory_usage)

502

final_memory = process.memory_info().rss / 1024 / 1024

503

504

print(f"Initial memory: {initial_memory:.1f} MB")

505

print(f"Peak memory during computation: {peak_memory:.1f} MB")

506

print(f"Average memory during computation: {avg_memory:.1f} MB")

507

print(f"Final memory: {final_memory:.1f} MB")

508

print(f"Memory increase: {final_memory - initial_memory:.1f} MB")

509

510

# Check for memory leaks

511

if final_memory > initial_memory + 100: # 100MB threshold

512

print("⚠️ Potential memory leak detected!")

513

514

return resources

515

516

memory_data = analyze_memory_usage()

517

```

518

519

### Distributed Diagnostics

520

521

```python

522

import dask.array as da

523

from dask.distributed import Client

524

from dask.diagnostics import ProgressBar

525

526

# Connect to distributed cluster

527

client = Client('scheduler-address:8786')

528

529

# Monitor distributed computation

530

x = da.random.random((50000, 50000), chunks=(5000, 5000))

531

computation = (x + x.T).sum()

532

533

# Progress works with distributed scheduler

534

with ProgressBar():

535

result = computation.compute()

536

537

# Access cluster diagnostics

538

print(f"Dashboard: {client.dashboard_link}")

539

print(f"Scheduler info: {client.scheduler_info()}")

540

541

# Worker resource usage

542

worker_info = client.scheduler_info()['workers']

543

for worker_addr, info in worker_info.items():

544

print(f"Worker {worker_addr}:")

545

print(f" CPU cores: {info.get('ncores', 'unknown')}")

546

print(f" Memory: {info.get('memory_limit', 'unknown')} bytes")

547

print(f" Tasks: {info.get('nthreads', 'unknown')} threads")

548

549

client.close()

550

```