CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dask

Parallel PyData with task scheduling for distributed analytics and computing.

Pending
Overview
Eval results
Files

core-functions.mddocs/

Core Functions

Essential functions for executing, optimizing, and managing Dask computations across all collection types. These functions provide the fundamental operations needed to work with any Dask collection.

Imports

import dask
from dask import compute, persist, optimize, visualize, delayed
from dask import is_dask_collection, get_annotations, annotate
from dask import tokenize, normalize_token
from dask.base import istask
from dask.local import get_sync as get

Capabilities

Computation Execution

Execute Dask collections to get concrete results, with support for different schedulers and optimization strategies.

def compute(*collections, scheduler=None, get=None, optimize_graph=True, 
           pool=None, chunksize=None, **kwargs):
    """
    Compute multiple dask collections, returning concrete results.
    
    Parameters:
    - *collections: Dask collections to compute
    - scheduler: Scheduler to use ('threads', 'processes', 'single-threaded')
    - get: Custom scheduler function
    - optimize_graph: Whether to optimize task graphs before execution
    - pool: Thread/process pool for computation
    - chunksize: Chunk size for multiprocessing scheduler
    - **kwargs: Additional scheduler arguments
    
    Returns:
    Tuple of computed results in same order as input collections
    """

def persist(*collections, scheduler=None, get=None, optimize_graph=True, 
           pool=None, chunksize=None, **kwargs):
    """
    Persist collections in memory for repeated use.
    
    Parameters:
    - *collections: Dask collections to persist
    - scheduler: Scheduler to use for persistence
    - get: Custom scheduler function  
    - optimize_graph: Whether to optimize before persisting
    - pool: Thread/process pool for computation
    - chunksize: Chunk size for multiprocessing scheduler
    - **kwargs: Additional scheduler arguments
    
    Returns:
    Tuple of persisted collections maintaining lazy interface
    """

Graph Optimization

Optimize task graphs for better performance through fusion, caching, and other transformations.

def optimize(*collections, **kwargs):
    """
    Optimize task graphs of collections.
    
    Parameters:
    - *collections: Collections to optimize
    - **kwargs: Optimization parameters
    
    Returns:
    Optimized collections with the same interface
    """

Visualization

Visualize task graphs and their dependencies for debugging and understanding computation structure.

def visualize(*collections, filename=None, format=None, optimize_graph=False,
             color='order', **kwargs):
    """
    Visualize task graphs of collections.
    
    Parameters:
    - *collections: Collections to visualize
    - filename: Output file path (None shows in notebook/browser)
    - format: Output format ('png', 'pdf', 'svg', etc.)
    - optimize_graph: Whether to optimize graph before visualization
    - color: Node coloring scheme ('order', 'tasks', etc.)
    - **kwargs: Additional graphviz parameters
    
    Returns:
    Graphviz object or None if filename specified
    """

Collection Utilities

Utility functions for working with Dask collections and determining their properties.

def is_dask_collection(obj):
    """
    Check if object is a Dask collection.
    
    Parameters:
    - obj: Object to check
    
    Returns:
    bool: True if object is a Dask collection
    """

def get_annotations():
    """
    Get current task graph annotations.
    
    Returns:
    dict: Current annotation dictionary
    """

def annotate(**annotations):
    """
    Context manager for adding annotations to task graphs.
    
    Parameters:
    - **annotations: Key-value pairs to add as annotations
    
    Returns:
    Context manager for annotation scope
    """

Delayed Computation

Create delayed objects for building custom task graphs with lazy evaluation.

def delayed(func=None, *, pure=None, nout=None, name=None, traverse=True):
    """
    Decorator to create delayed functions or wrap objects.
    
    Parameters:
    - func: Function to make delayed (None for decorator use)
    - pure: Whether function is pure (no side effects)
    - nout: Number of outputs for functions returning multiple values
    - name: Custom name for task in graph
    - traverse: Whether to traverse and delay nested collections
    
    Returns:
    Delayed function or object
    """

Task Graph Access

Access and manipulate the underlying task graphs of collections.

def get_sync(*collections):
    """
    Synchronous scheduler for immediate local execution.
    
    Parameters:
    - *collections: Collections to compute
    
    Returns:
    Computed results using single-threaded execution
    """

def istask(obj):
    """
    Check if object is a Dask task.
    
    Parameters:
    - obj: Object to check
    
    Returns:
    bool: True if object is a task tuple
    """

Tokenization

Generate unique tokens for objects to enable caching and task identification.

def tokenize(*args, **kwargs):
    """
    Generate deterministic token for objects.
    
    Parameters:
    - *args: Objects to tokenize
    - **kwargs: Additional objects to include in token
    
    Returns:
    str: Unique token string
    """

def normalize_token(obj):
    """
    Normalize an object to a token representation.
    
    Parameters:
    - obj: Object to normalize
    
    Returns:
    Normalized token representation
    """

Usage Examples

Basic Computation

import dask.array as da
import dask

# Create computation
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = (x + x.T).sum()

# Compute result
result = dask.compute(y)[0]

# Or compute directly on collection
result = y.compute()

Multiple Collections

import dask.array as da
import dask.dataframe as dd

# Create multiple computations
arr = da.random.random((5000, 5000), chunks=(1000, 1000))
df = dd.read_csv('data.csv')

result_arr = arr.mean()
result_df = df.value.sum()

# Compute both together (more efficient)
arr_mean, df_sum = dask.compute(result_arr, result_df)

Persistence for Reuse

import dask.array as da

# Create expensive computation
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = x.rechunk((500, 500))  # Expensive rechunking

# Persist for reuse
y_persisted = dask.persist(y)[0]

# Use multiple times without recomputation
result1 = y_persisted.sum().compute()
result2 = y_persisted.mean().compute()

Custom Delayed Functions

from dask.delayed import delayed
import pandas as pd

@delayed
def load_data(filename):
    return pd.read_csv(filename)

@delayed  
def process_data(df):
    return df.groupby('category').value.mean()

@delayed
def combine_results(*results):
    return pd.concat(results)

# Build computation graph
files = ['file1.csv', 'file2.csv', 'file3.csv']
loaded = [load_data(f) for f in files]
processed = [process_data(df) for df in loaded]
final = combine_results(*processed)

# Execute
result = final.compute()

Visualization and Debugging

import dask.array as da

# Create computation
x = da.random.random((1000, 1000), chunks=(100, 100))
y = (x + x.T).sum(axis=0)

# Visualize task graph
dask.visualize(y, filename='computation.png')

# With annotations for debugging
with dask.annotate(priority='high', resources={'memory': '4GB'}):
    z = y * 2
    
# Visualize annotated graph
dask.visualize(z, filename='annotated.png')

Install with Tessl CLI

npx tessl i tessl/pypi-dask

docs

arrays.md

bags.md

configuration.md

core-functions.md

dataframes.md

delayed.md

diagnostics.md

index.md

tile.json