CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-joblib

Lightweight pipelining with Python functions for disk-caching, parallel computing, and fast compressed persistence

Pending
Overview
Eval results
Files

parallel-processing.mddocs/

Parallel Processing

Embarrassingly parallel computing with readable list comprehension syntax. Supports multiple backends (threading, multiprocessing, loky, dask) with automatic backend selection, comprehensive configuration options, and optimizations for NumPy arrays and scientific computing workflows.

Capabilities

Parallel Execution

Main class for parallel computations using familiar list comprehension patterns with automatic load balancing and error handling.

class Parallel(Logger):
    def __init__(self, n_jobs=None, backend=None, return_as="list", verbose=0, timeout=None, 
                 pre_dispatch="2 * n_jobs", batch_size="auto", temp_folder=None, 
                 max_nbytes="1M", mmap_mode="r", prefer=None, require=None, **backend_kwargs):
        """
        Create parallel execution context.

        Parameters:
        - n_jobs: int or None, number of jobs (None uses default config, -1 for all CPUs, 1 for sequential)
        - backend: str or None, execution backend (None uses default, "threading", "multiprocessing", "loky", "sequential", "dask")
        - return_as: str, return format ("list", "generator", "generator_unordered")
        - verbose: int, verbosity level (0=silent, 10=progress bar, 50=debug)
        - timeout: float, timeout in seconds for the complete parallel call
        - pre_dispatch: int or str, number of batches to pre-dispatch
        - batch_size: int or "auto", size of batches for parallel execution
        - temp_folder: str or None, temporary folder for memory mapping large arrays
        - max_nbytes: str or int, memory threshold for automatic memory mapping
        - mmap_mode: str, memory mapping mode ("r", "r+", "w+", "c")
        - prefer: str or None, backend preference hint ("threads", "processes")
        - require: str or None, backend requirement ("sharedmem")
        - **backend_kwargs: additional backend-specific parameters
        """

    def __call__(self, iterable):
        """
        Execute parallel computation.

        Parameters:
        - iterable: iterable of delayed objects or callables

        Returns:
        List of results or generator (based on return_as parameter)
        """

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit with cleanup."""

Usage Examples:

from joblib import Parallel, delayed
import numpy as np

# Basic parallel execution
def square(x):
    return x ** 2

# Process numbers in parallel
results = Parallel(n_jobs=4)(delayed(square)(i) for i in range(10))

# With progress tracking
results = Parallel(n_jobs=4, verbose=10)(delayed(square)(i) for i in range(100))

# Context manager usage
with Parallel(n_jobs=4) as parallel:
    batch1 = parallel(delayed(square)(i) for i in range(10))
    batch2 = parallel(delayed(square)(i) for i in range(10, 20))

# Memory mapping for large arrays
def process_array(arr):
    return np.sum(arr)

large_arrays = [np.random.random(10000) for _ in range(10)]
results = Parallel(n_jobs=4, max_nbytes='100M', mmap_mode='r')(
    delayed(process_array)(arr) for arr in large_arrays
)

# Backend-specific configuration
results = Parallel(n_jobs=4, backend='multiprocessing', 
                  temp_folder='/tmp/joblib')(
    delayed(expensive_function)(i) for i in range(100)
)

Delayed Function Wrapper

Decorator to capture function arguments for deferred parallel execution.

def delayed(function):
    """
    Decorator to capture function and arguments for parallel execution.

    Parameters:
    - function: callable, function to wrap for delayed execution

    Returns:
    DelayedFunc object that can be called to create delayed tasks
    """

Usage Examples:

from joblib import Parallel, delayed

# Basic delayed usage
@delayed
def process_item(item, multiplier=2):
    return item * multiplier

# Create delayed tasks
tasks = [process_item(i, multiplier=3) for i in range(10)]

# Execute in parallel
results = Parallel(n_jobs=4)(tasks)

# Alternative syntax without decorator
def compute(x, y):
    return x + y

tasks = [delayed(compute)(i, i*2) for i in range(10)]
results = Parallel(n_jobs=4)(tasks)

# Method calls
class DataProcessor:
    def process(self, data):
        return data ** 2

processor = DataProcessor()
tasks = [delayed(processor.process)(data) for data in datasets]
results = Parallel(n_jobs=4)(tasks)

CPU and Job Management

Utilities for determining optimal parallelization settings and hardware capabilities.

def cpu_count(only_physical_cores=False):
    """
    Return number of CPUs available.

    Parameters:
    - only_physical_cores: bool, count only physical cores (not hyperthreaded)

    Returns:
    int: Number of available CPUs
    """

def effective_n_jobs(n_jobs=-1):
    """
    Determine actual number of parallel jobs that will be used.

    Parameters:
    - n_jobs: int, requested number of jobs (-1 for all CPUs)

    Returns:
    int: Actual number of jobs that will be used
    """

Usage Examples:

from joblib import cpu_count, effective_n_jobs

# Check available CPUs
total_cpus = cpu_count()
physical_cpus = cpu_count(only_physical_cores=True)

print(f"Total CPUs: {total_cpus}, Physical: {physical_cpus}")

# Determine effective job count
actual_jobs = effective_n_jobs(-1)  # All CPUs
half_jobs = effective_n_jobs(cpu_count() // 2)  # Half CPUs

# Use in Parallel configuration
optimal_jobs = min(len(data_batches), cpu_count())
results = Parallel(n_jobs=optimal_jobs)(tasks)

Configuration Context Managers

Context managers for configuring parallel execution settings globally or locally.

class parallel_config:
    def __init__(self, backend=None, *, n_jobs=None, verbose=0, temp_folder=None, 
                 max_nbytes="1M", mmap_mode="r", prefer=None, require=None, 
                 inner_max_num_threads=None, **backend_params):
        """
        Context manager to configure parallel execution globally.

        Parameters:
        - backend: str or None, default backend for Parallel objects (None uses system default)  
        - n_jobs: int or None, default number of jobs (None uses system default)
        - verbose: int, default verbosity level (default: 0)
        - temp_folder: str or None, default temporary folder (None uses system default)
        - max_nbytes: str or int, default memory threshold (default: "1M")
        - mmap_mode: str, default memory mapping mode (default: "r")
        - prefer: str or None, backend preference hint (None uses system default)
        - require: str or None, backend requirement (None uses system default)
        - inner_max_num_threads: int or None, maximum threads for inner parallelism
        - **backend_params: additional backend parameters
        """

class parallel_backend(parallel_config):
    def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params):
        """
        Context manager to change default parallel backend.

        Parameters:
        - backend: str or backend instance, parallel backend to use
        - n_jobs: int, number of jobs for this backend
        - inner_max_num_threads: int, thread limit for inner parallelism
        - **backend_params: additional backend-specific parameters
        """

Usage Examples:

from joblib import Parallel, delayed, parallel_config, parallel_backend

# Global configuration
with parallel_config(backend='multiprocessing', n_jobs=4, verbose=10):
    # All Parallel calls use these settings
    result1 = Parallel()(delayed(func)(i) for i in range(10))
    result2 = Parallel()(delayed(func)(i) for i in range(20, 30))

# Backend-specific configuration
with parallel_backend('threading', n_jobs=2):
    result = Parallel()(delayed(io_bound_task)(i) for i in range(10))

with parallel_backend('multiprocessing', n_jobs=4):
    result = Parallel()(delayed(cpu_bound_task)(i) for i in range(10))

# Nested configuration
with parallel_config(verbose=10):
    with parallel_backend('loky', n_jobs=4):
        result = Parallel()(delayed(func)(i) for i in range(100))

Backend Registration

Register custom parallel execution backends for specialized computing environments.

def register_parallel_backend(name, factory, make_default=False):
    """
    Register a new parallel backend factory.

    Parameters:
    - name: str, backend name identifier
    - factory: callable, factory function returning backend instance
    - make_default: bool, whether to make this the default backend

    Raises:
    ValueError: If name already exists and factory is different
    """

Usage Examples:

from joblib import register_parallel_backend, Parallel, delayed
from joblib._parallel_backends import ParallelBackendBase

class CustomBackend(ParallelBackendBase):
    """Custom parallel backend implementation."""
    
    def effective_n_jobs(self, n_jobs):
        return min(n_jobs, 8)  # Limit to 8 jobs
    
    def submit(self, func, callback=None):
        # Custom job submission logic
        pass
    
    def retrieve_result(self, futures, timeout=None):
        # Custom result retrieval logic
        pass

# Register custom backend
register_parallel_backend('custom', CustomBackend)

# Use custom backend
with parallel_backend('custom'):
    results = Parallel()(delayed(func)(i) for i in range(10))

# Register external backend (e.g., Ray)
def create_ray_backend(**kwargs):
    from ray.util.joblib import register_ray
    return register_ray()

register_parallel_backend('ray', create_ray_backend)

Advanced Parallel Patterns

Error Handling and Debugging

from joblib import Parallel, delayed

def may_fail(x):
    if x == 5:
        raise ValueError(f"Failed on {x}")
    return x ** 2

# Sequential execution for debugging
results = Parallel(n_jobs=1)(delayed(may_fail)(i) for i in range(10))

# Verbose output for monitoring
results = Parallel(n_jobs=4, verbose=50)(delayed(may_fail)(i) for i in range(10))

Memory Management with Large Data

import numpy as np
from joblib import Parallel, delayed

def process_large_array(arr):
    return np.mean(arr)

# Automatic memory mapping for large arrays
large_arrays = [np.random.random(1000000) for _ in range(10)]

results = Parallel(
    n_jobs=4,
    max_nbytes='1G',  # Trigger memory mapping above 1GB
    mmap_mode='r',    # Read-only memory mapping
    temp_folder='/fast-storage/tmp'
)(delayed(process_large_array)(arr) for arr in large_arrays)

Backend Selection Strategies

from joblib import Parallel, delayed

# I/O bound tasks - use threading
def download_file(url):
    return requests.get(url).content

urls = ['http://example.com/file{}.txt'.format(i) for i in range(10)]
results = Parallel(n_jobs=4, prefer='threads')(
    delayed(download_file)(url) for url in urls
)

# CPU bound tasks - use processes
def cpu_intensive(data):
    return np.fft.fft(data)

data_batches = [np.random.random(10000) for _ in range(10)]
results = Parallel(n_jobs=4, prefer='processes')(
    delayed(cpu_intensive)(batch) for batch in data_batches
)

# Shared memory requirement
def shared_memory_task(shared_array, index):
    return shared_array[index] * 2

results = Parallel(n_jobs=4, require='sharedmem')(
    delayed(shared_memory_task)(array, i) for i in range(len(array))
)

Install with Tessl CLI

npx tessl i tessl/pypi-joblib

docs

index.md

memory-caching.md

parallel-processing.md

persistence-serialization.md

utilities-infrastructure.md

tile.json