CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dask

Parallel PyData with task scheduling for distributed analytics and computing.

Pending
Overview
Eval results
Files

diagnostics.mddocs/

Diagnostics

Tools for monitoring performance, resource usage, and debugging distributed computations. Dask's diagnostic tools help profile execution, track resource consumption, and identify bottlenecks in parallel workflows.

Capabilities

Progress Tracking

Monitor computation progress with visual progress bars and reporting.

class ProgressBar:
    """
    Display computation progress with visual progress bar.
    
    Shows task completion status during compute() operations.
    Works in Jupyter notebooks, terminal, and web interfaces.
    """
    
    def __init__(self, minimum=1.0, dt=0.1):
        """
        Initialize progress bar.
        
        Parameters:
        - minimum: Minimum time (seconds) before showing progress
        - dt: Update interval (seconds)
        """
    
    def __enter__(self):
        """Start progress tracking."""
        return self
    
    def __exit__(self, *args):
        """Stop progress tracking."""
        pass
    
    def register(self):
        """Register as global progress callback."""
        pass
    
    def unregister(self):
        """Unregister progress callback."""
        pass

Performance Profiling

Profile task execution times and identify performance bottlenecks.

class Profiler:
    """
    Profile task execution times and function calls.
    
    Tracks time spent in each task and function to identify
    performance bottlenecks and optimization opportunities.
    """
    
    def __init__(self):
        """Initialize profiler."""
        pass
    
    def __enter__(self):
        """Start profiling."""
        return self
    
    def __exit__(self, *args):
        """Stop profiling and collect results."""
        pass
    
    def results(self):
        """
        Get profiling results.
        
        Returns:
        list: List of profiling records with timing information
        """
        pass
    
    def visualize(self, filename=None, **kwargs):
        """
        Visualize profiling results.
        
        Parameters:
        - filename: Output file for visualization
        - **kwargs: Additional visualization options
        
        Returns:
        Visualization object or saves to file
        """
        pass
    
    def clear(self):
        """Clear collected profiling data."""
        pass

Resource Monitoring

Monitor system resource usage during computations.

class ResourceProfiler:
    """
    Monitor system resource usage (CPU, memory, network, disk).
    
    Tracks resource consumption over time to identify
    resource bottlenecks and optimize resource allocation.
    """
    
    def __init__(self, dt=1.0):
        """
        Initialize resource profiler.
        
        Parameters:
        - dt: Sampling interval (seconds)
        """
        pass
    
    def __enter__(self):
        """Start resource monitoring."""
        return self
    
    def __exit__(self, *args):
        """Stop resource monitoring."""
        pass
    
    def results(self):
        """
        Get resource usage results.
        
        Returns:
        list: Resource usage data over time
        """
        pass
    
    def visualize(self, filename=None, **kwargs):
        """
        Visualize resource usage.
        
        Parameters:
        - filename: Output file for plots
        - **kwargs: Plotting options
        
        Returns:
        Resource usage plots
        """
        pass
    
    def clear(self):
        """Clear collected resource data."""
        pass

Cache Profiling

Monitor task graph caching and optimization effectiveness.

class CacheProfiler:
    """
    Monitor task caching and graph optimization.
    
    Tracks cache hits/misses and optimization statistics
    to tune caching strategies and graph optimizations.
    """
    
    def __init__(self):
        """Initialize cache profiler."""
        pass
    
    def __enter__(self):
        """Start cache monitoring."""
        return self
    
    def __exit__(self, *args):
        """Stop cache monitoring."""
        pass
    
    def results(self):
        """
        Get cache statistics.
        
        Returns:
        dict: Cache hit/miss ratios and optimization stats
        """
        pass

Custom Callbacks

Create custom diagnostic callbacks for specialized monitoring.

class Callback:
    """
    Base class for custom diagnostic callbacks.
    
    Subclass this to create custom monitoring and
    diagnostic tools for specific use cases.
    """
    
    def __init__(self):
        """Initialize callback."""
        pass
    
    def start(self, dsk):
        """
        Called when computation starts.
        
        Parameters:
        - dsk: Task graph dictionary
        """
        pass
    
    def start_task(self, key, task, **kwargs):
        """
        Called when task starts.
        
        Parameters:
        - key: Task key
        - task: Task tuple
        - **kwargs: Additional task information
        """
        pass
    
    def finish_task(self, key, task, **kwargs):
        """
        Called when task finishes.
        
        Parameters:
        - key: Task key  
        - task: Task tuple
        - **kwargs: Task results and timing
        """
        pass
    
    def finish(self, dsk, state, errored):
        """
        Called when computation finishes.
        
        Parameters:
        - dsk: Task graph dictionary
        - state: Final computation state
        - errored: Whether computation had errors
        """
        pass

Visualization Tools

Visualize computation graphs, profiling results, and resource usage.

def visualize(*args, filename=None, format=None, optimize_graph=False,
             color='order', **kwargs):
    """
    Visualize task graphs and diagnostic information.
    
    Parameters:
    - *args: Collections or profiling results to visualize
    - filename: Output file path
    - format: Output format ('png', 'svg', 'pdf', etc.)
    - optimize_graph: Whether to optimize graph before visualization
    - color: Node coloring scheme
    - **kwargs: Additional graphviz options
    
    Returns:
    Graphviz object or None if filename specified
    """
    pass

Memory Diagnostics

Monitor memory usage and identify memory leaks.

def memory_usage(func, *args, **kwargs):
    """
    Measure memory usage of function execution.
    
    Parameters:
    - func: Function to monitor
    - *args: Function arguments
    - **kwargs: Function keyword arguments
    
    Returns:
    tuple: (result, peak_memory_mb)
    """
    pass

def sizeof(obj):
    """
    Estimate memory size of object.
    
    Parameters:
    - obj: Object to measure
    
    Returns:
    int: Size in bytes
    """
    pass

Usage Examples

Basic Progress Monitoring

import dask.array as da
from dask.diagnostics import ProgressBar

# Create computation
x = da.random.random((10000, 10000), chunks=(1000, 1000))
computation = (x + x.T).sum()

# Monitor progress
with ProgressBar():
    result = computation.compute()

# Or register globally
progress = ProgressBar()
progress.register()

# All computations now show progress
result1 = x.mean().compute()
result2 = x.std().compute()

progress.unregister()

Performance Profiling

import dask.array as da
from dask.diagnostics import Profiler
import numpy as np

# Create complex computation
x = da.random.random((5000, 5000), chunks=(500, 500))
y = da.random.random((5000, 5000), chunks=(500, 500))

computation = ((x + y) @ (x - y)).sum(axis=0)

# Profile execution
with Profiler() as prof:
    result = computation.compute()

# Analyze results
profile_data = prof.results()
print(f"Total tasks: {len(profile_data)}")
print(f"Total time: {sum(p.end_time - p.start_time for p in profile_data):.2f}s")

# Visualize profiling results
prof.visualize(filename='profile.html')

# Find slowest tasks
slowest = sorted(profile_data, 
                key=lambda p: p.end_time - p.start_time, 
                reverse=True)[:10]
for p in slowest:
    print(f"Task {p.key}: {p.end_time - p.start_time:.3f}s")

Resource Monitoring

import dask.array as da
from dask.diagnostics import ResourceProfiler
import time

# Create memory-intensive computation
x = da.random.random((20000, 20000), chunks=(2000, 2000))
computation = x.rechunk((1000, 1000)).sum()

# Monitor resources
with ResourceProfiler(dt=0.5) as rprof:
    result = computation.compute()

# Analyze resource usage
resources = rprof.results()
print(f"Peak memory: {max(r.memory for r in resources):.1f} MB")
print(f"Peak CPU: {max(r.cpu for r in resources):.1f}%")

# Visualize resource usage over time
rprof.visualize(filename='resources.html')

# Check for resource bottlenecks
high_memory = [r for r in resources if r.memory > 1000]  # MB
if high_memory:
    print(f"High memory usage detected at {len(high_memory)} time points")

Combined Diagnostics

import dask.array as da
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler

# Complex computation pipeline
def create_computation():
    x = da.random.random((10000, 10000), chunks=(1000, 1000))
    y = da.random.random((10000, 10000), chunks=(1000, 1000))
    
    # Multi-step computation
    step1 = (x + y).rechunk((500, 500))
    step2 = step1 @ step1.T
    step3 = step2.sum(axis=0)
    return step3

computation = create_computation()

# Monitor with all diagnostics
with ProgressBar(), Profiler() as prof, ResourceProfiler() as rprof:
    result = computation.compute()

# Combined analysis
print("=== Performance Analysis ===")
profile_data = prof.results()
print(f"Total tasks executed: {len(profile_data)}")
print(f"Total execution time: {sum(p.end_time - p.start_time for p in profile_data):.2f}s")

print("\n=== Resource Analysis ===")
resources = rprof.results()
print(f"Peak memory usage: {max(r.memory for r in resources):.1f} MB")
print(f"Average CPU usage: {sum(r.cpu for r in resources) / len(resources):.1f}%")

# Save detailed reports
prof.visualize(filename='performance_profile.html')
rprof.visualize(filename='resource_usage.html')

Custom Diagnostic Callback

import dask.array as da
from dask.diagnostics import Callback
import time

class TaskLogger(Callback):
    """Custom callback to log task execution details."""
    
    def __init__(self):
        super().__init__()
        self.task_times = {}
        self.start_time = None
    
    def start(self, dsk):
        self.start_time = time.time()
        print(f"Starting computation with {len(dsk)} tasks")
    
    def start_task(self, key, task, **kwargs):
        self.task_times[key] = time.time()
        print(f"Starting task: {key}")
    
    def finish_task(self, key, task, **kwargs):
        duration = time.time() - self.task_times[key]
        print(f"Completed task {key} in {duration:.3f}s")
    
    def finish(self, dsk, state, errored):
        total_time = time.time() - self.start_time
        print(f"Computation finished in {total_time:.2f}s")
        if errored:
            print("Computation had errors!")

# Use custom callback
x = da.random.random((1000, 1000), chunks=(200, 200))
computation = x.sum()

with TaskLogger():
    result = computation.compute()

Memory Usage Analysis

import dask.array as da
from dask.diagnostics import ResourceProfiler
import psutil
import os

def analyze_memory_usage():
    """Analyze memory usage during computation."""
    
    # Get initial memory
    process = psutil.Process(os.getpid())
    initial_memory = process.memory_info().rss / 1024 / 1024  # MB
    
    # Create large computation
    x = da.random.random((15000, 15000), chunks=(1500, 1500))
    computation = x.rechunk((3000, 3000)).sum()
    
    with ResourceProfiler(dt=0.2) as rprof:
        result = computation.compute()
    
    # Analyze memory pattern
    resources = rprof.results()
    memory_usage = [r.memory for r in resources]
    
    peak_memory = max(memory_usage)
    avg_memory = sum(memory_usage) / len(memory_usage)
    final_memory = process.memory_info().rss / 1024 / 1024
    
    print(f"Initial memory: {initial_memory:.1f} MB")
    print(f"Peak memory during computation: {peak_memory:.1f} MB")
    print(f"Average memory during computation: {avg_memory:.1f} MB") 
    print(f"Final memory: {final_memory:.1f} MB")
    print(f"Memory increase: {final_memory - initial_memory:.1f} MB")
    
    # Check for memory leaks
    if final_memory > initial_memory + 100:  # 100MB threshold
        print("⚠️  Potential memory leak detected!")
    
    return resources

memory_data = analyze_memory_usage()

Distributed Diagnostics

import dask.array as da
from dask.distributed import Client
from dask.diagnostics import ProgressBar

# Connect to distributed cluster
client = Client('scheduler-address:8786')

# Monitor distributed computation
x = da.random.random((50000, 50000), chunks=(5000, 5000))
computation = (x + x.T).sum()

# Progress works with distributed scheduler
with ProgressBar():
    result = computation.compute()

# Access cluster diagnostics
print(f"Dashboard: {client.dashboard_link}")
print(f"Scheduler info: {client.scheduler_info()}")

# Worker resource usage
worker_info = client.scheduler_info()['workers']
for worker_addr, info in worker_info.items():
    print(f"Worker {worker_addr}:")
    print(f"  CPU cores: {info.get('ncores', 'unknown')}")
    print(f"  Memory: {info.get('memory_limit', 'unknown')} bytes")
    print(f"  Tasks: {info.get('nthreads', 'unknown')} threads")

client.close()

Install with Tessl CLI

npx tessl i tessl/pypi-dask

docs

arrays.md

bags.md

configuration.md

core-functions.md

dataframes.md

delayed.md

diagnostics.md

index.md

tile.json