Parallel PyData with task scheduling for distributed analytics and computing.
—
Tools for monitoring performance, resource usage, and debugging distributed computations. Dask's diagnostic tools help profile execution, track resource consumption, and identify bottlenecks in parallel workflows.
Monitor computation progress with visual progress bars and reporting.
class ProgressBar:
"""
Display computation progress with visual progress bar.
Shows task completion status during compute() operations.
Works in Jupyter notebooks, terminal, and web interfaces.
"""
def __init__(self, minimum=1.0, dt=0.1):
"""
Initialize progress bar.
Parameters:
- minimum: Minimum time (seconds) before showing progress
- dt: Update interval (seconds)
"""
def __enter__(self):
"""Start progress tracking."""
return self
def __exit__(self, *args):
"""Stop progress tracking."""
pass
def register(self):
"""Register as global progress callback."""
pass
def unregister(self):
"""Unregister progress callback."""
passProfile task execution times and identify performance bottlenecks.
class Profiler:
"""
Profile task execution times and function calls.
Tracks time spent in each task and function to identify
performance bottlenecks and optimization opportunities.
"""
def __init__(self):
"""Initialize profiler."""
pass
def __enter__(self):
"""Start profiling."""
return self
def __exit__(self, *args):
"""Stop profiling and collect results."""
pass
def results(self):
"""
Get profiling results.
Returns:
list: List of profiling records with timing information
"""
pass
def visualize(self, filename=None, **kwargs):
"""
Visualize profiling results.
Parameters:
- filename: Output file for visualization
- **kwargs: Additional visualization options
Returns:
Visualization object or saves to file
"""
pass
def clear(self):
"""Clear collected profiling data."""
passMonitor system resource usage during computations.
class ResourceProfiler:
"""
Monitor system resource usage (CPU, memory, network, disk).
Tracks resource consumption over time to identify
resource bottlenecks and optimize resource allocation.
"""
def __init__(self, dt=1.0):
"""
Initialize resource profiler.
Parameters:
- dt: Sampling interval (seconds)
"""
pass
def __enter__(self):
"""Start resource monitoring."""
return self
def __exit__(self, *args):
"""Stop resource monitoring."""
pass
def results(self):
"""
Get resource usage results.
Returns:
list: Resource usage data over time
"""
pass
def visualize(self, filename=None, **kwargs):
"""
Visualize resource usage.
Parameters:
- filename: Output file for plots
- **kwargs: Plotting options
Returns:
Resource usage plots
"""
pass
def clear(self):
"""Clear collected resource data."""
passMonitor task graph caching and optimization effectiveness.
class CacheProfiler:
"""
Monitor task caching and graph optimization.
Tracks cache hits/misses and optimization statistics
to tune caching strategies and graph optimizations.
"""
def __init__(self):
"""Initialize cache profiler."""
pass
def __enter__(self):
"""Start cache monitoring."""
return self
def __exit__(self, *args):
"""Stop cache monitoring."""
pass
def results(self):
"""
Get cache statistics.
Returns:
dict: Cache hit/miss ratios and optimization stats
"""
passCreate custom diagnostic callbacks for specialized monitoring.
class Callback:
"""
Base class for custom diagnostic callbacks.
Subclass this to create custom monitoring and
diagnostic tools for specific use cases.
"""
def __init__(self):
"""Initialize callback."""
pass
def start(self, dsk):
"""
Called when computation starts.
Parameters:
- dsk: Task graph dictionary
"""
pass
def start_task(self, key, task, **kwargs):
"""
Called when task starts.
Parameters:
- key: Task key
- task: Task tuple
- **kwargs: Additional task information
"""
pass
def finish_task(self, key, task, **kwargs):
"""
Called when task finishes.
Parameters:
- key: Task key
- task: Task tuple
- **kwargs: Task results and timing
"""
pass
def finish(self, dsk, state, errored):
"""
Called when computation finishes.
Parameters:
- dsk: Task graph dictionary
- state: Final computation state
- errored: Whether computation had errors
"""
passVisualize computation graphs, profiling results, and resource usage.
def visualize(*args, filename=None, format=None, optimize_graph=False,
color='order', **kwargs):
"""
Visualize task graphs and diagnostic information.
Parameters:
- *args: Collections or profiling results to visualize
- filename: Output file path
- format: Output format ('png', 'svg', 'pdf', etc.)
- optimize_graph: Whether to optimize graph before visualization
- color: Node coloring scheme
- **kwargs: Additional graphviz options
Returns:
Graphviz object or None if filename specified
"""
passMonitor memory usage and identify memory leaks.
def memory_usage(func, *args, **kwargs):
"""
Measure memory usage of function execution.
Parameters:
- func: Function to monitor
- *args: Function arguments
- **kwargs: Function keyword arguments
Returns:
tuple: (result, peak_memory_mb)
"""
pass
def sizeof(obj):
"""
Estimate memory size of object.
Parameters:
- obj: Object to measure
Returns:
int: Size in bytes
"""
passimport dask.array as da
from dask.diagnostics import ProgressBar
# Create computation
x = da.random.random((10000, 10000), chunks=(1000, 1000))
computation = (x + x.T).sum()
# Monitor progress
with ProgressBar():
result = computation.compute()
# Or register globally
progress = ProgressBar()
progress.register()
# All computations now show progress
result1 = x.mean().compute()
result2 = x.std().compute()
progress.unregister()import dask.array as da
from dask.diagnostics import Profiler
import numpy as np
# Create complex computation
x = da.random.random((5000, 5000), chunks=(500, 500))
y = da.random.random((5000, 5000), chunks=(500, 500))
computation = ((x + y) @ (x - y)).sum(axis=0)
# Profile execution
with Profiler() as prof:
result = computation.compute()
# Analyze results
profile_data = prof.results()
print(f"Total tasks: {len(profile_data)}")
print(f"Total time: {sum(p.end_time - p.start_time for p in profile_data):.2f}s")
# Visualize profiling results
prof.visualize(filename='profile.html')
# Find slowest tasks
slowest = sorted(profile_data,
key=lambda p: p.end_time - p.start_time,
reverse=True)[:10]
for p in slowest:
print(f"Task {p.key}: {p.end_time - p.start_time:.3f}s")import dask.array as da
from dask.diagnostics import ResourceProfiler
import time
# Create memory-intensive computation
x = da.random.random((20000, 20000), chunks=(2000, 2000))
computation = x.rechunk((1000, 1000)).sum()
# Monitor resources
with ResourceProfiler(dt=0.5) as rprof:
result = computation.compute()
# Analyze resource usage
resources = rprof.results()
print(f"Peak memory: {max(r.memory for r in resources):.1f} MB")
print(f"Peak CPU: {max(r.cpu for r in resources):.1f}%")
# Visualize resource usage over time
rprof.visualize(filename='resources.html')
# Check for resource bottlenecks
high_memory = [r for r in resources if r.memory > 1000] # MB
if high_memory:
print(f"High memory usage detected at {len(high_memory)} time points")import dask.array as da
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler
# Complex computation pipeline
def create_computation():
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = da.random.random((10000, 10000), chunks=(1000, 1000))
# Multi-step computation
step1 = (x + y).rechunk((500, 500))
step2 = step1 @ step1.T
step3 = step2.sum(axis=0)
return step3
computation = create_computation()
# Monitor with all diagnostics
with ProgressBar(), Profiler() as prof, ResourceProfiler() as rprof:
result = computation.compute()
# Combined analysis
print("=== Performance Analysis ===")
profile_data = prof.results()
print(f"Total tasks executed: {len(profile_data)}")
print(f"Total execution time: {sum(p.end_time - p.start_time for p in profile_data):.2f}s")
print("\n=== Resource Analysis ===")
resources = rprof.results()
print(f"Peak memory usage: {max(r.memory for r in resources):.1f} MB")
print(f"Average CPU usage: {sum(r.cpu for r in resources) / len(resources):.1f}%")
# Save detailed reports
prof.visualize(filename='performance_profile.html')
rprof.visualize(filename='resource_usage.html')import dask.array as da
from dask.diagnostics import Callback
import time
class TaskLogger(Callback):
"""Custom callback to log task execution details."""
def __init__(self):
super().__init__()
self.task_times = {}
self.start_time = None
def start(self, dsk):
self.start_time = time.time()
print(f"Starting computation with {len(dsk)} tasks")
def start_task(self, key, task, **kwargs):
self.task_times[key] = time.time()
print(f"Starting task: {key}")
def finish_task(self, key, task, **kwargs):
duration = time.time() - self.task_times[key]
print(f"Completed task {key} in {duration:.3f}s")
def finish(self, dsk, state, errored):
total_time = time.time() - self.start_time
print(f"Computation finished in {total_time:.2f}s")
if errored:
print("Computation had errors!")
# Use custom callback
x = da.random.random((1000, 1000), chunks=(200, 200))
computation = x.sum()
with TaskLogger():
result = computation.compute()import dask.array as da
from dask.diagnostics import ResourceProfiler
import psutil
import os
def analyze_memory_usage():
"""Analyze memory usage during computation."""
# Get initial memory
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# Create large computation
x = da.random.random((15000, 15000), chunks=(1500, 1500))
computation = x.rechunk((3000, 3000)).sum()
with ResourceProfiler(dt=0.2) as rprof:
result = computation.compute()
# Analyze memory pattern
resources = rprof.results()
memory_usage = [r.memory for r in resources]
peak_memory = max(memory_usage)
avg_memory = sum(memory_usage) / len(memory_usage)
final_memory = process.memory_info().rss / 1024 / 1024
print(f"Initial memory: {initial_memory:.1f} MB")
print(f"Peak memory during computation: {peak_memory:.1f} MB")
print(f"Average memory during computation: {avg_memory:.1f} MB")
print(f"Final memory: {final_memory:.1f} MB")
print(f"Memory increase: {final_memory - initial_memory:.1f} MB")
# Check for memory leaks
if final_memory > initial_memory + 100: # 100MB threshold
print("⚠️ Potential memory leak detected!")
return resources
memory_data = analyze_memory_usage()import dask.array as da
from dask.distributed import Client
from dask.diagnostics import ProgressBar
# Connect to distributed cluster
client = Client('scheduler-address:8786')
# Monitor distributed computation
x = da.random.random((50000, 50000), chunks=(5000, 5000))
computation = (x + x.T).sum()
# Progress works with distributed scheduler
with ProgressBar():
result = computation.compute()
# Access cluster diagnostics
print(f"Dashboard: {client.dashboard_link}")
print(f"Scheduler info: {client.scheduler_info()}")
# Worker resource usage
worker_info = client.scheduler_info()['workers']
for worker_addr, info in worker_info.items():
print(f"Worker {worker_addr}:")
print(f" CPU cores: {info.get('ncores', 'unknown')}")
print(f" Memory: {info.get('memory_limit', 'unknown')} bytes")
print(f" Tasks: {info.get('nthreads', 'unknown')} threads")
client.close()Install with Tessl CLI
npx tessl i tessl/pypi-dask