CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pebble

Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

thread-pools.mddocs/

Thread Pools

Managed pools of worker threads for executing multiple tasks concurrently. Thread pools are ideal for I/O-bound tasks and provide fine-grained control over worker lifecycle, task scheduling, and resource management. They offer better resource utilization and management compared to creating individual threads.

Capabilities

ThreadPool Class

A managed pool of worker threads that can execute multiple tasks concurrently with automatic worker lifecycle management, task queuing, and optional worker restart capabilities.

class ThreadPool:
    def __init__(
        self,
        max_workers: int = multiprocessing.cpu_count(),
        max_tasks: int = 0,
        initializer: Callable = None,
        initargs: list = ()
    ):
        """
        Create a thread pool for concurrent task execution.
        
        Parameters:
        - max_workers: Maximum number of worker threads (defaults to CPU count)
        - max_tasks: Maximum tasks per worker before restart (0 = no limit)
        - initializer: Function called when each worker thread starts
        - initargs: Arguments passed to initializer function
        """

Basic Usage

from pebble import ThreadPool
import time

# Create pool with default settings
pool = ThreadPool()

# Create pool with custom configuration
pool = ThreadPool(max_workers=4, max_tasks=10)

def io_task(duration, message):
    time.sleep(duration)
    return f"Completed: {message}"

# Schedule tasks
future1 = pool.schedule(io_task, args=(1, "Task 1"))
future2 = pool.schedule(io_task, args=(2, "Task 2"))

# Get results
result1 = future1.result()
result2 = future2.result()

print(f"Results: {result1}, {result2}")

# Always clean up
pool.close()
pool.join()

Task Scheduling

Schedule individual tasks for execution by worker threads:

def schedule(
    self,
    function: Callable,
    args: tuple = (),
    kwargs: dict = {}
) -> concurrent.futures.Future:
    """
    Schedule a function for execution in the thread pool.
    
    Parameters:
    - function: The function to execute
    - args: Positional arguments to pass to function
    - kwargs: Keyword arguments to pass to function
    
    Returns:
    concurrent.futures.Future object for retrieving the result
    """

def submit(
    self,
    function: Callable,
    *args,
    **kwargs
) -> concurrent.futures.Future:
    """
    Submit a function for execution (compatibility with concurrent.futures).
    
    Parameters:
    - function: The function to execute
    - args: Positional arguments to pass to function  
    - kwargs: Keyword arguments to pass to function
    
    Returns:
    concurrent.futures.Future object for retrieving the result
    """

Usage Examples

from pebble import ThreadPool
import requests
import time

def fetch_url(url, timeout=10):
    response = requests.get(url, timeout=timeout)
    return {"url": url, "status": response.status_code, "size": len(response.content)}

def process_data(data, multiplier=1):
    # Simulate data processing
    time.sleep(0.1)
    return [x * multiplier for x in data]

# Create and use thread pool
with ThreadPool(max_workers=8) as pool:
    # Schedule multiple HTTP requests
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2", 
        "https://httpbin.org/json",
        "https://httpbin.org/user-agent"
    ]
    
    # Using schedule method
    fetch_futures = []
    for url in urls:
        future = pool.schedule(fetch_url, args=(url,), kwargs={"timeout": 5})
        fetch_futures.append(future)
    
    # Using submit method (concurrent.futures style)
    data_futures = []
    datasets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
    for dataset in datasets:
        future = pool.submit(process_data, dataset, multiplier=2)
        data_futures.append(future)
    
    # Collect results
    fetch_results = [f.result() for f in fetch_futures]
    data_results = [f.result() for f in data_futures]
    
    print("Fetch results:", fetch_results)
    print("Data results:", data_results)

Bulk Operations with Map

Execute a function across multiple inputs using the map interface:

def map(
    self,
    function: Callable,
    *iterables,
    chunksize: int = None,
    timeout: float = None
) -> MapFuture:
    """
    Apply function to every item of iterables in parallel.
    
    Parameters:
    - function: Function to apply to each item
    - iterables: One or more iterables to process
    - chunksize: Number of items per chunk (None for automatic sizing)
    - timeout: Maximum time to wait for all results
    
    Returns:
    MapFuture object that yields results as they become available
    """

Map Usage Examples

from pebble import ThreadPool
import math
import time

def compute_sqrt(x):
    time.sleep(0.01)  # Simulate some work
    return math.sqrt(x)

def fetch_user_data(user_id):
    # Simulate API call
    time.sleep(0.1)
    return {"id": user_id, "name": f"User {user_id}", "active": user_id % 2 == 0}

# Using map for batch processing
with ThreadPool(max_workers=6) as pool:
    # Process numbers
    numbers = range(100)
    sqrt_results = pool.map(compute_sqrt, numbers, chunksize=10)
    
    # Process as results become available
    print("Square roots:")
    for i, result in enumerate(sqrt_results):
        print(f"sqrt({i}) = {result:.3f}")
    
    # Fetch user data for multiple users
    user_ids = range(1, 21)
    user_futures = pool.map(fetch_user_data, user_ids, timeout=30)
    
    # Get all results at once
    users = list(user_futures)
    active_users = [user for user in users if user["active"]]
    print(f"Active users: {len(active_users)}/{len(users)}")

Worker Initialization

Initialize worker threads with shared resources or configuration:

from pebble import ThreadPool
import logging
import threading

# Setup function for each worker thread
def worker_init(db_config, log_level):
    # Configure logging for this thread
    logging.basicConfig(level=log_level)
    logger = logging.getLogger(f"worker-{threading.current_thread().ident}")
    
    # Initialize database connection (stored in thread-local storage)
    thread_local = threading.local() 
    thread_local.db_connection = create_db_connection(db_config)
    thread_local.logger = logger
    
    logger.info("Worker thread initialized")

def create_db_connection(config):
    # Simulate database connection
    return {"host": config["host"], "connected": True}

def process_record(record_id):
    # Access thread-local resources
    thread_local = threading.local()
    if hasattr(thread_local, 'logger'):
        thread_local.logger.info(f"Processing record {record_id}")
    
    # Simulate work with database
    time.sleep(0.1)
    return f"Processed {record_id}"

# Create pool with worker initialization
db_config = {"host": "localhost", "port": 5432}
pool = ThreadPool(
    max_workers=4,
    initializer=worker_init,
    initargs=(db_config, logging.INFO)
)

try:
    # Schedule work
    records = range(1, 11)
    futures = [pool.schedule(process_record, args=(record_id,)) for record_id in records]
    
    # Get results
    results = [f.result() for f in futures]
    print("Results:", results)
    
finally:
    pool.close()
    pool.join()

Pool Lifecycle Management

Properly manage pool resources and handle shutdown:

def close(self):
    """
    Prevent new tasks from being submitted to the pool.
    Currently running tasks will continue to completion.
    """

def stop(self):
    """
    Stop the pool immediately, cancelling pending tasks.
    Running tasks may be interrupted.
    """

def join(self, timeout: float = None):
    """
    Wait for all worker threads to complete.
    
    Parameters:
    - timeout: Maximum time to wait in seconds (None for no timeout)
    """

Lifecycle Management Examples

from pebble import ThreadPool
import time
import signal
import sys

def long_running_task(task_id, duration):
    print(f"Task {task_id} starting (duration: {duration}s)")
    time.sleep(duration)
    print(f"Task {task_id} completed")
    return f"Result {task_id}"

# Graceful shutdown example
def graceful_shutdown_demo():
    pool = ThreadPool(max_workers=3)
    
    try:
        # Schedule some long-running tasks
        futures = []
        for i in range(5):
            future = pool.schedule(long_running_task, args=(i, 2))
            futures.append(future)
        
        # Simulate running for a while
        time.sleep(3)
        
        print("Initiating graceful shutdown...")
        pool.close()  # No new tasks accepted
        
        # Wait for completion with timeout
        pool.join(timeout=10)
        
        # Collect results from completed tasks
        for i, future in enumerate(futures):
            try:
                result = future.result(timeout=0)  # Don't wait
                print(f"Task {i} result: {result}")
            except Exception as e:
                print(f"Task {i} failed or incomplete: {e}")
                
    except KeyboardInterrupt:
        print("Interrupt received, stopping pool...")
        pool.stop()  # Force stop
        pool.join(timeout=5)

# Context manager usage (recommended)
def context_manager_demo():
    with ThreadPool(max_workers=4) as pool:
        # Pool is automatically closed and joined when exiting context
        futures = []
        for i in range(10):
            future = pool.schedule(long_running_task, args=(i, 1))
            futures.append(future)
        
        # Get results
        results = [f.result() for f in futures]
        print("All tasks completed:", len(results))
    
    print("Pool automatically cleaned up")

# Signal handling for clean shutdown
def signal_handler(signum, frame):
    print(f"Signal {signum} received, shutting down...")
    # Handle pool cleanup here
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Run examples
graceful_shutdown_demo()
context_manager_demo()

Advanced Configuration

Configure pools for specific use cases and performance requirements:

from pebble import ThreadPool
import time

# High-throughput pool with worker recycling
high_throughput_pool = ThreadPool(
    max_workers=20,        # Many workers for high concurrency
    max_tasks=100          # Recycle workers every 100 tasks
)

# Resource-constrained pool
constrained_pool = ThreadPool(
    max_workers=2,         # Limited workers for resource constraints
    max_tasks=0            # No worker recycling
)

# Specialized pool with custom initialization
def init_worker_with_cache():
    import threading
    thread_local = threading.local()
    thread_local.cache = {}
    thread_local.request_count = 0

def cached_operation(key, value):
    import threading
    thread_local = threading.local()
    
    if not hasattr(thread_local, 'cache'):
        thread_local.cache = {}
        thread_local.request_count = 0
    
    thread_local.request_count += 1
    
    if key in thread_local.cache:
        return thread_local.cache[key]
    
    # Simulate expensive operation
    time.sleep(0.1)
    result = value * 2
    thread_local.cache[key] = result
    
    return result

# Use specialized pool
specialized_pool = ThreadPool(
    max_workers=4,
    initializer=init_worker_with_cache
)

try:
    # Test caching behavior
    test_data = [(f"key_{i % 5}", i) for i in range(20)]  # Repeated keys
    futures = [
        specialized_pool.schedule(cached_operation, args=(key, value))
        for key, value in test_data
    ]
    
    results = [f.result() for f in futures]
    print(f"Processed {len(results)} items with caching")
    
finally:
    specialized_pool.close()
    specialized_pool.join()

Install with Tessl CLI

npx tessl i tessl/pypi-pebble

docs

asynchronous-decorators.md

concurrent-decorators.md

future-types-exceptions.md

index.md

process-pools.md

synchronization-utilities.md

thread-pools.md

tile.json