CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dask

Parallel PyData with task scheduling for distributed analytics and computing.

Pending
Overview
Eval results
Files

delayed.mddocs/

Delayed

Build custom task graphs with lazy evaluation for any Python function. Delayed enables flexible parallel workflows by wrapping functions and values to create task graphs that execute only when explicitly computed.

Capabilities

Delayed Function Creation

Create delayed versions of functions and wrap values for lazy evaluation.

def delayed(func=None, *, pure=None, nout=None, name=None, traverse=True):
    """
    Decorator to create delayed functions or wrap objects.
    
    Parameters:
    - func: Function to make delayed (None for decorator use)
    - pure: Whether function is pure (no side effects, enables caching)
    - nout: Number of outputs for functions returning multiple values
    - name: Custom name for tasks in graph
    - traverse: Whether to traverse and delay nested collections
    
    Returns:
    Delayed function or object
    """

Core Delayed Class

The Delayed class represents lazy computations in task graphs.

class Delayed:
    """
    Lazy evaluation wrapper for building task graphs.
    
    Properties:
    - key: str - Unique identifier for this computation
    - dask: dict - Task graph dictionary
    """
    
    def compute(self, scheduler=None, get=None, **kwargs):
        """
        Compute delayed object and return result.
        
        Parameters:
        - scheduler: Scheduler to use ('threads', 'processes', etc.)
        - get: Custom scheduler function
        - **kwargs: Additional scheduler arguments
        
        Returns:
        Computed result of the delayed operation
        """
    
    def persist(self, scheduler=None, get=None, **kwargs):
        """
        Persist delayed object in memory for reuse.
        
        Parameters:
        - scheduler: Scheduler for persistence
        - get: Custom scheduler function
        - **kwargs: Additional scheduler arguments
        
        Returns:
        dask.delayed.Delayed: Persisted delayed object
        """
    
    def visualize(self, filename=None, format=None, optimize_graph=False, 
                 **kwargs):
        """
        Visualize the task graph for this delayed object.
        
        Parameters:
        - filename: Output file path
        - format: Output format ('png', 'svg', 'pdf', etc.)
        - optimize_graph: Whether to optimize before visualization
        - **kwargs: Additional graphviz parameters
        
        Returns:
        Graphviz object or None if filename specified
        """
    
    def __call__(self, *args, **kwargs):
        """Call delayed object as function with arguments."""
    
    def __getattr__(self, attr):
        """Access attributes of delayed object."""
    
    def __getitem__(self, key):
        """Index into delayed object."""
    
    def __setitem__(self, key, value):
        """Set item in delayed object."""
    
    def __iter__(self):
        """Iterate over delayed object."""
    
    def __len__(self):
        """Get length of delayed object."""
    
    def __bool__(self):
        """Boolean evaluation of delayed object."""

Delayed Operations

Mathematical and logical operations on delayed objects.

# Arithmetic operations create new delayed objects
# delayed_obj + other -> Delayed
# delayed_obj - other -> Delayed  
# delayed_obj * other -> Delayed
# delayed_obj / other -> Delayed
# delayed_obj // other -> Delayed
# delayed_obj % other -> Delayed
# delayed_obj ** other -> Delayed

# Comparison operations
# delayed_obj == other -> Delayed
# delayed_obj != other -> Delayed
# delayed_obj < other -> Delayed
# delayed_obj <= other -> Delayed
# delayed_obj > other -> Delayed
# delayed_obj >= other -> Delayed

# Logical operations
# delayed_obj & other -> Delayed
# delayed_obj | other -> Delayed
# delayed_obj ^ other -> Delayed
# ~delayed_obj -> Delayed

# Unary operations
# -delayed_obj -> Delayed
# +delayed_obj -> Delayed
# abs(delayed_obj) -> Delayed

Task Graph Construction

Build complex task graphs by composing delayed operations.

@delayed
def custom_function(arg1, arg2, **kwargs):
    """
    Custom delayed function example.
    
    Any function can be made delayed using the @delayed decorator.
    The function will not execute until .compute() is called.
    """
    # Function implementation
    return result

# Delayed wrapper for existing functions
delayed_func = delayed(existing_function)

# Delayed values
delayed_value = delayed(some_value)

# Function composition creates task graphs
@delayed
def step1(data):
    return process_data(data)

@delayed  
def step2(processed_data):
    return analyze_data(processed_data)

@delayed
def step3(analysis_result):
    return generate_report(analysis_result)

# Build computation pipeline
data = delayed(load_data())
processed = step1(data)
analyzed = step2(processed)  
report = step3(analyzed)

# Execute entire pipeline
final_result = report.compute()

Multiple Return Values

Handle functions that return multiple values.

@delayed(nout=2)
def function_with_multiple_returns(data):
    """
    Function returning multiple values.
    
    The nout parameter specifies number of return values,
    enabling proper unpacking of delayed results.
    """
    result1 = process_part1(data)
    result2 = process_part2(data)
    return result1, result2

# Unpack multiple returns
data = delayed(load_data())
part1, part2 = function_with_multiple_returns(data)

# Use each part independently
analysis1 = delayed(analyze)(part1)
analysis2 = delayed(analyze)(part2)

# Combine results
final = delayed(combine)(analysis1, analysis2)
result = final.compute()

Pure Functions and Caching

Enable caching for pure functions without side effects.

@delayed(pure=True)
def expensive_pure_function(data):
    """
    Pure function with no side effects.
    
    Setting pure=True enables caching of results,
    improving performance for repeated calls with same arguments.
    """
    # Expensive computation
    return expensive_result

# Results will be cached automatically
data = delayed(load_data())
result1 = expensive_pure_function(data)
result2 = expensive_pure_function(data)  # Uses cached result

combined = delayed(combine)(result1, result2)
final = combined.compute()

Integration with Collections

Convert between delayed objects and other Dask collections.

# Convert collections to delayed
import dask.array as da
import dask.dataframe as dd

array = da.random.random((1000, 1000), chunks=(100, 100))
delayed_array = array.to_delayed()

dataframe = dd.read_csv('data.csv')  
delayed_dataframe = dataframe.to_delayed()

# Convert delayed to collections
delayed_values = [delayed(load_partition)(i) for i in range(10)]
bag = db.from_delayed(delayed_values)

delayed_dfs = [delayed(load_dataframe)(f) for f in files]
combined_df = dd.from_delayed(delayed_dfs)

Error Handling

Handle errors in delayed computations.

@delayed
def risky_function(data):
    """Function that might raise exceptions."""
    if data is None:
        raise ValueError("Data cannot be None")
    return process_data(data)

@delayed
def safe_wrapper(data):
    """Wrapper with error handling."""
    try:
        return risky_function(data)
    except ValueError as e:
        return f"Error: {e}"

# Errors are raised during compute()
data = delayed(None)
result = safe_wrapper(data)
output = result.compute()  # Returns error message

Usage Examples

Basic Delayed Workflow

from dask.delayed import delayed
import pandas as pd

@delayed
def load_data(filename):
    return pd.read_csv(filename)

@delayed
def clean_data(df):
    return df.dropna().reset_index(drop=True)

@delayed
def analyze_data(df):
    return df.describe()

@delayed
def save_results(analysis, filename):
    analysis.to_csv(filename)
    return f"Saved to {filename}"

# Build computation graph
filename = 'data.csv'
raw_data = load_data(filename)
clean_data_result = clean_data(raw_data)
analysis = analyze_data(clean_data_result)
save_status = save_results(analysis, 'results.csv')

# Execute computation
result = save_status.compute()
print(result)

Parallel Processing Pipeline

from dask.delayed import delayed
import pandas as pd

@delayed
def process_file(filename):
    """Process single file."""
    df = pd.read_csv(filename)
    # Complex processing logic
    return df.groupby('category').value.sum()

@delayed
def combine_results(results):
    """Combine results from multiple files."""
    return pd.concat(results, axis=1).fillna(0)

# Process multiple files in parallel
files = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
processed = [process_file(f) for f in files]

# Combine all results
combined = combine_results(processed)
final_result = combined.compute()

Dynamic Task Graphs

from dask.delayed import delayed

@delayed
def generate_data(seed, size):
    """Generate synthetic data."""
    import numpy as np
    np.random.seed(seed)
    return np.random.random(size)

@delayed
def process_batch(data_batch, method='mean'):
    """Process a batch of data."""
    if method == 'mean':
        return data_batch.mean()
    elif method == 'sum':
        return data_batch.sum()
    else:
        return data_batch.std()

@delayed
def aggregate_results(results):
    """Aggregate all batch results."""
    return sum(results) / len(results)

# Dynamic graph based on parameters
n_batches = 5
batch_size = 1000
method = 'mean'

# Generate data batches
data_batches = [generate_data(i, batch_size) for i in range(n_batches)]

# Process each batch
batch_results = [process_batch(batch, method) for batch in data_batches]

# Aggregate final result
final_result = aggregate_results(batch_results)
answer = final_result.compute()

Custom Class with Delayed Methods

from dask.delayed import delayed

class DelayedProcessor:
    """Class with delayed methods for data processing."""
    
    def __init__(self, params):
        self.params = delayed(params)
    
    @delayed
    def load_data(self, source):
        """Load data from source."""
        # Implementation
        return loaded_data
    
    @delayed  
    def preprocess(self, data):
        """Preprocess the data."""
        # Use self.params for configuration
        return preprocessed_data
    
    @delayed
    def train_model(self, data):
        """Train model on data."""
        return trained_model
    
    @delayed
    def evaluate(self, model, test_data):
        """Evaluate model performance."""
        return evaluation_metrics

# Use delayed class methods
processor = DelayedProcessor({'param1': 10, 'param2': 'setting'})

# Build computation pipeline
raw_data = processor.load_data('data_source')
clean_data = processor.preprocess(raw_data)
model = processor.train_model(clean_data)

test_data = processor.load_data('test_source')
clean_test = processor.preprocess(test_data)
metrics = processor.evaluate(model, clean_test)

# Execute entire pipeline
final_metrics = metrics.compute()

Integration with Other Collections

import dask.array as da
import dask.dataframe as dd
from dask.delayed import delayed

@delayed
def custom_analysis(array_data, df_data):
    """Custom analysis combining array and dataframe."""
    array_stats = {
        'mean': array_data.mean(),
        'std': array_data.std()
    }
    
    df_stats = {
        'row_count': len(df_data),
        'col_count': len(df_data.columns)
    }
    
    return {'array': array_stats, 'dataframe': df_stats}

# Create collections
array = da.random.random((10000, 100), chunks=(1000, 100))
dataframe = dd.read_csv('large_file.csv')

# Convert to delayed for custom processing
delayed_array = array.compute_chunk_sizes().to_delayed().flatten()[0]
delayed_df = dataframe.to_delayed()[0]

# Custom analysis
analysis = custom_analysis(delayed_array, delayed_df)
result = analysis.compute()

Install with Tessl CLI

npx tessl i tessl/pypi-dask

docs

arrays.md

bags.md

configuration.md

core-functions.md

dataframes.md

delayed.md

diagnostics.md

index.md

tile.json