Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.
npx @tessl/cli install tessl/pypi-pebble@5.1.0Threading and multiprocessing eye-candy providing a clean, decorator-based API for concurrent execution. Pebble offers thread and process pools with advanced features like timeouts, worker restart capabilities, and error handling with traceback preservation.
pip install pebbleimport pebbleCommon imports for decorators:
from pebble.concurrent import thread, process
from pebble.asynchronous import thread, processCommon imports for pools:
from pebble import ThreadPool, ProcessPoolCommon imports for utilities:
from pebble import synchronized, waitforthreads, waitforqueuesfrom pebble.concurrent import process
from pebble import ProcessPool
import time
# Using decorators for simple concurrent execution
@process
def cpu_intensive_task(n):
# Simulate CPU-intensive work
total = 0
for i in range(n):
total += i ** 2
return total
# Call returns a Future
future = cpu_intensive_task(1000000)
result = future.result() # Blocks until complete
print(f"Result: {result}")
# Using pools for managing multiple workers
with ProcessPool() as pool:
# Schedule multiple tasks
futures = []
for i in range(5):
future = pool.schedule(cpu_intensive_task, args=[100000 * (i + 1)])
futures.append(future)
# Collect results
results = [f.result() for f in futures]
print(f"Results: {results}")Pebble's architecture centers around two execution models:
Key components:
concurrent.futures.Futureasyncio.FutureSynchronous decorators for thread and process-based execution that return concurrent.futures.Future objects. These decorators provide the simplest way to execute functions concurrently with minimal code changes.
@thread(name=None, daemon=True, pool=None)
def decorated_function(): ...
@process(name=None, daemon=True, timeout=None, mp_context=None, pool=None)
def decorated_function(): ...AsyncIO-compatible decorators for thread and process-based execution that return asyncio.Future objects. Perfect for integration with async/await patterns and AsyncIO applications.
@thread(name=None, daemon=True, pool=None)
async def decorated_function(): ...
@process(name=None, daemon=True, timeout=None, mp_context=None, pool=None)
async def decorated_function(): ...Managed pools of worker threads for executing multiple tasks concurrently. Thread pools are ideal for I/O-bound tasks and provide fine-grained control over worker lifecycle and task scheduling.
class ThreadPool:
def __init__(self, max_workers=multiprocessing.cpu_count(), max_tasks=0, initializer=None, initargs=[]): ...
def schedule(self, function, args=(), kwargs={}): ...
def submit(self, function, *args, **kwargs): ...
def map(self, function, *iterables, **kwargs): ...
def close(self): ...
def stop(self): ...
def join(self): ...Managed pools of worker processes for executing CPU-intensive tasks. Process pools bypass Python's GIL and provide true parallelism with advanced features like timeouts and automatic worker restart.
class ProcessPool:
def __init__(self, max_workers=multiprocessing.cpu_count(), max_tasks=0, initializer=None, initargs=[], context=multiprocessing): ...
def schedule(self, function, args=(), kwargs={}, timeout=None): ...
def submit(self, function, timeout, /, *args, **kwargs): ...
def map(self, function, *iterables, **kwargs): ...
def close(self): ...
def stop(self): ...
def join(self): ...Utility functions and decorators for thread synchronization, signal handling, and waiting operations. These tools help coordinate concurrent execution and handle edge cases.
@synchronized(lock=None)
def decorated_function(): ...
@sighandler(signals)
def signal_handler(): ...
def waitforthreads(threads, timeout=None): ...
def waitforqueues(queues, timeout=None): ...Enhanced Future objects and exception types for handling concurrent execution results, timeouts, and error conditions specific to Pebble's execution model.
class ProcessFuture(concurrent.futures.Future):
def cancel(self): ...
def result(self, timeout=None): ...
def exception(self, timeout=None): ...
class ProcessExpired(OSError):
def __init__(self, msg, code=0, pid=None): ...
class MapFuture(concurrent.futures.Future): ...
class ProcessMapFuture(concurrent.futures.Future): ...# Core execution types
from typing import Callable, Optional, Any, List, Dict
from concurrent.futures import Future
from multiprocessing.context import BaseContext
from threading import Lock, RLock, Semaphore
import signal
# Function signatures for decorators
CallableType = Callable[..., Any]
ThreadDecoratorReturnType = Callable[..., Future]
ProcessDecoratorReturnType = Callable[..., 'ProcessFuture']
AsyncIODecoratorReturnType = Callable[..., 'asyncio.Future']
# Parameter types
ThreadDecoratorParams = {
'name': Optional[str],
'daemon': bool,
'pool': Optional['ThreadPool']
}
ProcessDecoratorParams = {
'name': Optional[str],
'daemon': bool,
'timeout': Optional[float],
'mp_context': Optional[BaseContext],
'pool': Optional['ProcessPool']
}
# Synchronization types
SynchronizationLock = Lock | RLock | Semaphore
SignalType = int | List[int]