CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-joblib

Lightweight pipelining with Python functions for disk-caching, parallel computing, and fast compressed persistence

Pending
Overview
Eval results
Files

utilities-infrastructure.mddocs/

Utilities and Infrastructure

Core utilities including object hashing, logging with timing, backend infrastructure, and compression management for extending joblib's functionality and integrating with scientific computing workflows.

Capabilities

Object Hashing

Fast hash calculation for Python objects to create unique identifiers, used internally by Memory caching and available for custom caching implementations.

def hash(obj, hash_name="md5", coerce_mmap=False):
    """
    Quick calculation of hash to identify Python objects uniquely.

    Parameters:
    - obj: any Python object to hash
    - hash_name: str, hashing algorithm ("md5" or "sha1")
    - coerce_mmap: bool, treat memory-mapped arrays as regular arrays

    Returns:
    str: hexadecimal hash string

    Raises:
    ValueError: if hash_name is not supported
    """

Usage Examples:

from joblib import hash
import numpy as np

# Hash simple objects
hash_int = hash(42)
hash_str = hash("hello world")
hash_list = hash([1, 2, 3, 4])

print(f"Integer hash: {hash_int}")
print(f"String hash: {hash_str}")

# Hash NumPy arrays
array = np.random.random(1000)
array_hash = hash(array)
print(f"Array hash: {array_hash}")

# Different arrays with same content have same hash
array2 = array.copy()
assert hash(array) == hash(array2)

# Different hash algorithms
md5_hash = hash(array, hash_name="md5")    # Default
sha1_hash = hash(array, hash_name="sha1")  # More secure

# Memory-mapped arrays
mmap_array = np.memmap('temp.dat', dtype='float32', mode='w+', shape=(1000,))
mmap_array[:] = array[:]

# Hash memory-mapped array as regular array
regular_hash = hash(mmap_array, coerce_mmap=True)
mmap_hash = hash(mmap_array, coerce_mmap=False)

# Complex objects
complex_obj = {
    'data': np.random.random((100, 50)),
    'params': {'learning_rate': 0.01, 'epochs': 100},
    'metadata': ['training', 'validation']
}
complex_hash = hash(complex_obj)

Non-Picklable Object Wrapping

Utilities for handling non-serializable objects in parallel processing contexts.

def wrap_non_picklable_objects(obj, keep_wrapper=True):
    """
    Wrap non-picklable objects to enable parallel processing.

    Parameters:
    - obj: object to wrap (may contain non-picklable elements)
    - keep_wrapper: bool, whether to keep wrapper for round-trip compatibility

    Returns:
    Wrapped object that can be pickled and sent to parallel workers
    """

Usage Examples:

from joblib import wrap_non_picklable_objects, Parallel, delayed
import sqlite3

# Example with non-picklable database connection
def create_db_connection():
    return sqlite3.connect(':memory:')

def process_with_connection(data, connection):
    # Use database connection in processing
    cursor = connection.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS temp (value INTEGER)")
    cursor.execute("INSERT INTO temp VALUES (?)", (data,))
    return cursor.fetchall()

# Wrap non-picklable connection
connection = create_db_connection()
wrapped_connection = wrap_non_picklable_objects(connection)

# Use in parallel processing (connection will be recreated in each worker)
data_items = [1, 2, 3, 4, 5]
results = Parallel(n_jobs=2)(
    delayed(process_with_connection)(item, wrapped_connection) 
    for item in data_items
)

# Custom objects with lambda functions or other non-picklable elements
class ProcessorWithLambda:
    def __init__(self):
        self.transform = lambda x: x ** 2  # Non-picklable lambda

processor = ProcessorWithLambda()
wrapped_processor = wrap_non_picklable_objects(processor)

Logging and Timing

Logging utilities with built-in timing capabilities for monitoring performance and debugging computational workflows.

class Logger:
    def __init__(self, depth=3, name=None):
        """
        Base logging class with formatting and timing capabilities.

        Parameters:
        - depth: int, call stack depth for logging context
        - name: str, logger name (None for auto-generation)
        """

    def warn(self, msg):
        """
        Log a warning message.

        Parameters:
        - msg: str, warning message to log
        """

    def info(self, msg):
        """
        Log an informational message.

        Parameters:
        - msg: str, info message to log
        """

    def debug(self, msg):
        """
        Log a debug message.

        Parameters:
        - msg: str, debug message to log
        """

    def format(self, obj, indent=0):
        """
        Return formatted representation of object.

        Parameters:
        - obj: object to format
        - indent: int, indentation level

        Returns:
        str: formatted object representation
        """

class PrintTime:
    def __init__(self, logfile=None, logdir=None):
        """
        Print and log messages with execution time tracking.

        Parameters:
        - logfile: str, path to log file (None for stdout)
        - logdir: str, directory for log files (None for current dir)
        """

Usage Examples:

from joblib import Logger, PrintTime
import time
import numpy as np

# Basic logging
logger = Logger(name="DataProcessor")

def process_data(data):
    logger.info(f"Processing {len(data)} items")
    
    if len(data) == 0:
        logger.warn("Empty data provided")
        return []
    
    logger.debug(f"Data type: {type(data)}")
    
    # Simulate processing
    result = [x * 2 for x in data]
    
    logger.info("Processing complete")
    return result

# Process with logging
data = [1, 2, 3, 4, 5]
result = process_data(data)

# Time tracking with PrintTime
timer = PrintTime()

print("Starting computation...")
start_time = time.time()

# Simulate expensive computation
large_array = np.random.random((10000, 1000))
result = np.mean(large_array, axis=1)

elapsed = time.time() - start_time
print(f"Computation completed in {elapsed:.2f} seconds")

# Custom formatting
logger = Logger()
complex_data = {
    'arrays': [np.random.random(100) for _ in range(3)],
    'config': {'param1': 0.1, 'param2': 'test'},
    'metadata': {'version': 1, 'created': time.time()}
}

formatted_output = logger.format(complex_data, indent=2)
print("Complex data structure:")
print(formatted_output)

Backend Infrastructure

Abstract base classes for implementing custom parallel execution and storage backends.

class ParallelBackendBase:
    """
    Abstract base class for parallel execution backends.
    
    Subclass this to implement custom parallel processing backends
    for specialized computing environments or frameworks.
    """
    
    # Backend capabilities
    default_n_jobs = 1
    supports_inner_max_num_threads = False
    supports_retrieve_callback = False
    supports_return_generator = False
    supports_timeout = False
    
    def effective_n_jobs(self, n_jobs):
        """
        Determine actual number of parallel jobs.

        Parameters:
        - n_jobs: int, requested number of jobs

        Returns:
        int: actual number of jobs to use
        """

    def submit(self, func, callback=None):
        """
        Schedule function execution.

        Parameters:
        - func: callable, function to execute
        - callback: callable, optional callback for result handling

        Returns:
        Future-like object representing the computation
        """

    def retrieve_result(self, futures, timeout=None):
        """
        Retrieve results from submitted computations.

        Parameters:
        - futures: list of future objects
        - timeout: float, timeout in seconds

        Returns:
        Generator yielding (future, result) pairs
        """

class StoreBackendBase:
    """
    Abstract base class for storage backends.
    
    Subclass this to implement custom storage solutions
    for Memory caching (e.g., cloud storage, databases).
    """

    def _open_item(self, f, mode):
        """
        Open item in storage backend.

        Parameters:
        - f: file identifier
        - mode: str, file opening mode

        Returns:
        File-like object
        """

    def _item_exists(self, location):
        """
        Check if item exists in storage.

        Parameters:
        - location: str, item location identifier

        Returns:
        bool: True if item exists
        """

    def _move_item(self, src, dst):
        """
        Move item within storage backend.

        Parameters:
        - src: str, source location
        - dst: str, destination location
        """

    def clear_item(self, call_id):
        """
        Clear single cached item.

        Parameters:
        - call_id: str, unique identifier for cached call
        """

    def clear_path(self, path):
        """
        Clear all items at specified path.

        Parameters:
        - path: str, path to clear
        """

    def clear(self):
        """Clear all items in storage backend."""

Usage Examples:

from joblib import ParallelBackendBase, StoreBackendBase, register_parallel_backend, register_store_backend

# Custom parallel backend example
class GPUBackend(ParallelBackendBase):
    """Example GPU computing backend."""
    
    supports_timeout = True
    default_n_jobs = 4  # Number of GPU streams
    
    def __init__(self, device_id=0):
        self.device_id = device_id
    
    def effective_n_jobs(self, n_jobs):
        # Limit to available GPU streams
        return min(n_jobs, 8)
    
    def submit(self, func, callback=None):
        # Submit computation to GPU
        # Return GPU future object
        pass
    
    def retrieve_result(self, futures, timeout=None):
        # Retrieve results from GPU
        for future in futures:
            yield future, future.result(timeout=timeout)

# Register custom backend
register_parallel_backend('gpu', GPUBackend)

# Custom storage backend example
class RedisStoreBackend(StoreBackendBase):
    """Example Redis storage backend for caching."""
    
    def __init__(self, host='localhost', port=6379, db=0):
        import redis
        self.redis_client = redis.Redis(host=host, port=port, db=db)
    
    def _item_exists(self, location):
        return self.redis_client.exists(location)
    
    def _open_item(self, f, mode):
        # Implement Redis-based file-like object
        pass
    
    def clear_item(self, call_id):
        self.redis_client.delete(call_id)
    
    def clear(self):
        self.redis_client.flushdb()

# Register custom storage backend
register_store_backend('redis', RedisStoreBackend)

# Use custom backends
from joblib import Memory, Parallel, delayed, parallel_backend

# Use custom storage
mem = Memory(backend='redis', backend_options={'host': 'cache-server'})

# Use custom parallel backend
with parallel_backend('gpu', device_id=1):
    results = Parallel(n_jobs=4)(delayed(gpu_function)(i) for i in range(100))

Compression Management

Registration and management of compression algorithms for persistence operations.

def register_compressor(compressor_name, compressor, force=False):
    """
    Register a new compressor for use with dump/load operations.

    Parameters:
    - compressor_name: str, name to identify the compressor
    - compressor: compressor object implementing required interface
    - force: bool, whether to overwrite existing compressor with same name

    Raises:
    ValueError: if compressor_name already exists and force=False
    """

Usage Examples:

from joblib import register_compressor, dump, load

# Example custom compressor (simplified)
class CustomCompressor:
    """Example custom compression algorithm."""
    
    def compress(self, data):
        # Implement compression logic
        return compressed_data
    
    def decompress(self, compressed_data):
        # Implement decompression logic
        return original_data

# Register custom compressor
custom_comp = CustomCompressor()
register_compressor('custom', custom_comp)

# Use custom compressor
data = {'large_array': np.random.random(100000)}
dump(data, 'data_custom.pkl', compress='custom')
loaded_data = load('data_custom.pkl')

# Register external compressor library
try:
    import snappy
    
    class SnappyCompressor:
        def compress(self, data):
            return snappy.compress(data)
        
        def decompress(self, compressed_data):
            return snappy.decompress(compressed_data)
    
    register_compressor('snappy', SnappyCompressor())
    
    # Use snappy compression for fast compression/decompression
    dump(data, 'data_snappy.pkl', compress='snappy')
    
except ImportError:
    print("Snappy not available")

Advanced Infrastructure Patterns

Custom Caching Strategy

from joblib import Memory, hash
from joblib._store_backends import StoreBackendBase
import time

class TimeBasedCacheBackend(StoreBackendBase):
    """Cache backend with automatic expiration."""
    
    def __init__(self, base_backend, ttl_seconds=3600):
        self.base_backend = base_backend
        self.ttl_seconds = ttl_seconds
        self.timestamps = {}
    
    def _item_exists(self, location):
        if not self.base_backend._item_exists(location):
            return False
        
        # Check if item has expired
        timestamp = self.timestamps.get(location, 0)
        if time.time() - timestamp > self.ttl_seconds:
            self.clear_item(location)
            return False
        
        return True
    
    def _open_item(self, f, mode):
        if 'w' in mode:
            # Record timestamp when writing
            self.timestamps[f] = time.time()
        return self.base_backend._open_item(f, mode)

# Use time-based caching
register_store_backend('ttl', TimeBasedCacheBackend)
mem = Memory('./cache', backend='ttl', backend_options={'ttl_seconds': 1800})

Performance Monitoring

from joblib import Logger, Parallel, delayed
import time
import psutil

class PerformanceLogger(Logger):
    """Logger with system performance monitoring."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_time = None
        self.start_memory = None
    
    def start_monitoring(self):
        self.start_time = time.time()
        self.start_memory = psutil.virtual_memory().used
        self.info("Performance monitoring started")
    
    def log_performance(self, operation_name):
        if self.start_time:
            elapsed = time.time() - self.start_time
            current_memory = psutil.virtual_memory().used
            memory_delta = current_memory - self.start_memory
            
            self.info(f"{operation_name} completed:")
            self.info(f"  Time: {elapsed:.2f} seconds")
            self.info(f"  Memory change: {memory_delta / 1024**2:.1f} MB")
            self.info(f"  CPU usage: {psutil.cpu_percent()}%")

# Use performance monitoring
perf_logger = PerformanceLogger(name="Computation")

perf_logger.start_monitoring()

# Perform computation
results = Parallel(n_jobs=4)(delayed(expensive_function)(i) for i in range(100))

perf_logger.log_performance("Parallel computation")

Install with Tessl CLI

npx tessl i tessl/pypi-joblib

docs

index.md

memory-caching.md

parallel-processing.md

persistence-serialization.md

utilities-infrastructure.md

tile.json