Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Managed pools of worker processes for executing CPU-intensive tasks. Process pools bypass Python's Global Interpreter Lock (GIL) to provide true parallelism with advanced features like timeouts, automatic worker restart, and comprehensive error handling with process isolation.
A managed pool of worker processes that can execute multiple tasks concurrently with true parallelism, automatic process lifecycle management, and advanced error handling.
class ProcessPool:
def __init__(
self,
max_workers: int = multiprocessing.cpu_count(),
max_tasks: int = 0,
initializer: Callable = None,
initargs: list = (),
context: multiprocessing.context.BaseContext = multiprocessing
):
"""
Create a process pool for CPU-intensive concurrent task execution.
Parameters:
- max_workers: Maximum number of worker processes (defaults to CPU count)
- max_tasks: Maximum tasks per worker before restart (0 = no limit)
- initializer: Function called when each worker process starts
- initargs: Arguments passed to initializer function
- context: Multiprocessing context (spawn, fork, forkserver)
"""from pebble import ProcessPool
import time
# Create pool with default settings
pool = ProcessPool()
# Create pool with custom configuration
pool = ProcessPool(max_workers=4, max_tasks=50)
def cpu_intensive_task(n, multiplier=1):
# CPU-intensive computation that benefits from true parallelism
total = 0
for i in range(n):
total += (i ** 2) * multiplier
return total
# Schedule tasks
future1 = pool.schedule(cpu_intensive_task, args=(100000,), kwargs={"multiplier": 2})
future2 = pool.schedule(cpu_intensive_task, args=(200000,))
# Get results
result1 = future1.result()
result2 = future2.result()
print(f"Results: {result1}, {result2}")
# Always clean up
pool.close()
pool.join()Schedule individual tasks with timeout protection to prevent runaway processes:
def schedule(
self,
function: Callable,
args: list = (),
kwargs: dict = {},
timeout: float = None
) -> ProcessFuture:
"""
Schedule a function for execution in the process pool.
Parameters:
- function: The function to execute
- args: Positional arguments to pass to function
- kwargs: Keyword arguments to pass to function
- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
Returns:
ProcessFuture object for retrieving the result
"""
def submit(
self,
function: Callable,
timeout: Optional[float],
/,
*args,
**kwargs
) -> ProcessFuture:
"""
Submit a function for execution (compatibility with concurrent.futures).
Parameters:
- function: The function to execute
- timeout: Maximum execution time in seconds (positional-only parameter)
- args: Positional arguments to pass to function
- kwargs: Keyword arguments to pass to function
Returns:
ProcessFuture object for retrieving the result
"""from pebble import ProcessPool, ProcessExpired
import time
import math
def prime_factorization(n):
"""CPU-intensive task: find prime factors"""
factors = []
d = 2
while d * d <= n:
while n % d == 0:
factors.append(d)
n //= d
d += 1
if n > 1:
factors.append(n)
return factors
def monte_carlo_pi(iterations):
"""CPU-intensive task: estimate Pi using Monte Carlo method"""
import random
inside_circle = 0
for _ in range(iterations):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
inside_circle += 1
return 4.0 * inside_circle / iterations
def potentially_slow_task(delay):
"""Task that might run too long"""
time.sleep(delay)
return f"Completed after {delay} seconds"
# CPU-intensive work with timeouts
with ProcessPool(max_workers=4) as pool:
# Schedule CPU-intensive tasks
numbers = [982451653, 982451654, 982451655, 982451656]
factor_futures = []
for num in numbers:
future = pool.schedule(
prime_factorization,
args=(num,),
timeout=30.0 # 30 second timeout
)
factor_futures.append(future)
# Monte Carlo Pi estimation
pi_futures = []
for iterations in [1000000, 2000000, 3000000]:
future = pool.schedule(
monte_carlo_pi,
args=(iterations,),
timeout=60.0 # 60 second timeout
)
pi_futures.append(future)
# Schedule tasks that might timeout
timeout_futures = [
pool.schedule(potentially_slow_task, args=(1,), timeout=5.0), # Should complete
pool.schedule(potentially_slow_task, args=(10,), timeout=5.0) # Should timeout
]
# Collect results with error handling
print("Prime factorizations:")
for i, future in enumerate(factor_futures):
try:
result = future.result()
print(f" {numbers[i]} = {' × '.join(map(str, result))}")
except TimeoutError:
print(f" {numbers[i]} = TIMEOUT")
except Exception as e:
print(f" {numbers[i]} = ERROR: {e}")
print("\nPi estimations:")
for i, future in enumerate(pi_futures):
try:
pi_estimate = future.result()
iterations = [1000000, 2000000, 3000000][i]
error = abs(pi_estimate - math.pi)
print(f" {iterations:,} iterations: π ≈ {pi_estimate:.6f} (error: {error:.6f})")
except Exception as e:
print(f" ERROR: {e}")
print("\nTimeout examples:")
for i, future in enumerate(timeout_futures):
try:
result = future.result()
print(f" Task {i+1}: {result}")
except TimeoutError:
print(f" Task {i+1}: TIMEOUT")
except ProcessExpired as e:
print(f" Task {i+1}: PROCESS DIED: {e}")Execute a function across multiple inputs efficiently using process pools:
def map(
self,
function: Callable,
*iterables,
chunksize: int = None,
timeout: float = None
) -> ProcessMapFuture:
"""
Apply function to every item of iterables in parallel using processes.
Parameters:
- function: Function to apply to each item
- iterables: One or more iterables to process
- chunksize: Number of items per chunk sent to each process
- timeout: Maximum time to wait for all results
Returns:
ProcessMapFuture object that yields results as they become available
"""from pebble import ProcessPool
import math
import time
def cpu_bound_function(x):
"""Simulate CPU-intensive work"""
result = 0
for i in range(x * 1000):
result += math.sin(i) * math.cos(i)
return result
def data_processing_pipeline(data_chunk):
"""Process a chunk of data"""
processed = []
for item in data_chunk:
# Simulate complex processing
processed_item = {
'original': item,
'squared': item ** 2,
'sqrt': math.sqrt(abs(item)),
'factorial': math.factorial(min(abs(item), 10)) # Limit to prevent huge numbers
}
processed.append(processed_item)
return processed
def matrix_operation(matrix_row):
"""Perform operations on matrix row"""
return [x ** 2 + math.sin(x) for x in matrix_row]
# Efficient parallel processing with map
with ProcessPool(max_workers=6) as pool:
# Process large dataset
large_dataset = list(range(1, 101))
print("Processing large dataset...")
start_time = time.time()
# Use map with optimal chunk size
results = pool.map(
cpu_bound_function,
large_dataset,
chunksize=10, # Process 10 items per chunk
timeout=120 # 2 minute timeout for entire operation
)
# Convert to list to get all results
processed_results = list(results)
end_time = time.time()
print(f"Processed {len(processed_results)} items in {end_time - start_time:.2f} seconds")
print(f"Average result: {sum(processed_results) / len(processed_results):.2f}")
# Data processing pipeline
raw_data = [list(range(i*10, (i+1)*10)) for i in range(20)] # 20 chunks of 10 items each
print("\nRunning data processing pipeline...")
pipeline_results = pool.map(
data_processing_pipeline,
raw_data,
chunksize=2, # 2 data chunks per process
timeout=60
)
# Flatten results
all_processed = []
for chunk_result in pipeline_results:
all_processed.extend(chunk_result)
print(f"Processed {len(all_processed)} data items through pipeline")
# Matrix operations
matrix = [[i+j for j in range(100)] for i in range(50)] # 50x100 matrix
print("\nPerforming matrix operations...")
matrix_results = pool.map(
matrix_operation,
matrix,
chunksize=5, # 5 rows per process
timeout=30
)
processed_matrix = list(matrix_results)
print(f"Processed matrix with {len(processed_matrix)} rows")Configure the multiprocessing context for different process creation methods:
import multiprocessing
from pebble import ProcessPool
def worker_task(data, worker_id=None):
import os
return {
'data': data,
'worker_pid': os.getpid(),
'worker_id': worker_id
}
# Different multiprocessing contexts
def context_examples():
# Spawn context (creates fresh Python interpreter)
spawn_ctx = multiprocessing.get_context('spawn')
spawn_pool = ProcessPool(max_workers=2, context=spawn_ctx)
# Fork context (copies current process) - Unix only
try:
fork_ctx = multiprocessing.get_context('fork')
fork_pool = ProcessPool(max_workers=2, context=fork_ctx)
except RuntimeError:
print("Fork context not available on this platform")
fork_pool = None
# Forkserver context (hybrid approach) - Unix only
try:
forkserver_ctx = multiprocessing.get_context('forkserver')
forkserver_pool = ProcessPool(max_workers=2, context=forkserver_ctx)
except RuntimeError:
print("Forkserver context not available on this platform")
forkserver_pool = None
# Test different contexts
test_data = list(range(10))
print("Testing spawn context:")
with spawn_pool:
spawn_results = [
spawn_pool.schedule(worker_task, args=(data, f"spawn-{data}"))
for data in test_data
]
for future in spawn_results:
print(f" {future.result()}")
if fork_pool:
print("\nTesting fork context:")
with fork_pool:
fork_results = [
fork_pool.schedule(worker_task, args=(data, f"fork-{data}"))
for data in test_data
]
for future in fork_results:
print(f" {future.result()}")
if forkserver_pool:
print("\nTesting forkserver context:")
with forkserver_pool:
forkserver_results = [
forkserver_pool.schedule(worker_task, args=(data, f"forkserver-{data}"))
for data in test_data
]
for future in forkserver_results:
print(f" {future.result()}")
# Run context examples
context_examples()Initialize worker processes with shared resources and handle cleanup:
from pebble import ProcessPool
import multiprocessing
import logging
import os
# Global state for worker processes
worker_state = {}
def init_worker_process(config, log_level):
"""Initialize each worker process"""
global worker_state
# Setup logging for this process
logging.basicConfig(
level=log_level,
format=f'PID-{os.getpid()}: %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize worker state
worker_state = {
'config': config,
'logger': logger,
'task_count': 0,
'process_id': os.getpid()
}
logger.info(f"Worker process {os.getpid()} initialized with config: {config}")
def worker_task_with_state(task_data):
"""Task that uses initialized worker state"""
global worker_state
worker_state['task_count'] += 1
logger = worker_state['logger']
logger.info(f"Processing task {worker_state['task_count']}: {task_data}")
# Simulate work using config
multiplier = worker_state['config'].get('multiplier', 1)
result = task_data * multiplier
# Simulate some processing time
import time
time.sleep(0.1)
logger.info(f"Task completed. Result: {result}")
return {
'input': task_data,
'result': result,
'task_number': worker_state['task_count'],
'process_id': worker_state['process_id']
}
# Create pool with worker initialization
config = {'multiplier': 3, 'timeout': 30}
pool = ProcessPool(
max_workers=3,
max_tasks=5, # Restart workers every 5 tasks
initializer=init_worker_process,
initargs=(config, logging.INFO)
)
try:
# Schedule multiple tasks to see worker behavior
tasks = list(range(1, 16)) # 15 tasks
futures = []
for task in tasks:
future = pool.schedule(worker_task_with_state, args=(task,))
futures.append(future)
# Collect results
results = []
for future in futures:
try:
result = future.result(timeout=10)
results.append(result)
except Exception as e:
print(f"Task failed: {e}")
# Print results showing worker process recycling
print(f"\nProcessed {len(results)} tasks:")
for result in results:
print(f" Task {result['task_number']} in PID {result['process_id']}: "
f"{result['input']} -> {result['result']}")
# Group by process ID to see worker recycling
by_process = {}
for result in results:
pid = result['process_id']
if pid not in by_process:
by_process[pid] = []
by_process[pid].append(result['task_number'])
print(f"\nTasks by worker process:")
for pid, task_numbers in by_process.items():
print(f" PID {pid}: tasks {task_numbers}")
finally:
pool.close()
pool.join()Handle various error conditions specific to process-based execution:
from pebble import ProcessPool, ProcessExpired
import signal
import time
import os
def normal_task(x):
return x * 2
def crashing_task():
# This will cause the process to crash
os._exit(1) # Immediate process termination
def hanging_task():
# This task will hang indefinitely
while True:
time.sleep(1)
def memory_intensive_task(size):
# This might run out of memory
big_list = [0] * size
return len(big_list)
def signal_task():
# This task will receive a signal
import signal
os.kill(os.getpid(), signal.SIGTERM)
# Comprehensive error handling
with ProcessPool(max_workers=4) as pool:
# Schedule various types of tasks
futures = {
'normal': pool.schedule(normal_task, args=(42,)),
'crashing': pool.schedule(crashing_task),
'hanging': pool.schedule(hanging_task),
'memory': pool.schedule(memory_intensive_task, args=(10**9,)), # Huge allocation
'timeout': pool.schedule(hanging_task, timeout=2.0), # Will timeout
'signal': pool.schedule(signal_task)
}
# Handle each type of error
for task_name, future in futures.items():
try:
if task_name == 'hanging':
# Don't wait for hanging task
result = future.result(timeout=1.0)
else:
result = future.result(timeout=10.0)
print(f"{task_name}: SUCCESS - {result}")
except TimeoutError:
print(f"{task_name}: TIMEOUT - Task exceeded time limit")
except ProcessExpired as e:
print(f"{task_name}: PROCESS DIED - PID: {e.pid}, Exit code: {e.exitcode}")
except MemoryError:
print(f"{task_name}: MEMORY ERROR - Not enough memory")
except OSError as e:
print(f"{task_name}: OS ERROR - {e}")
except Exception as e:
print(f"{task_name}: UNEXPECTED ERROR - {type(e).__name__}: {e}")
print("\nAll error handling completed")Configure pools for specific performance and reliability requirements:
from pebble import ProcessPool
import multiprocessing
import time
# High-performance pool for CPU-intensive work
def create_high_performance_pool():
return ProcessPool(
max_workers=multiprocessing.cpu_count() * 2, # Oversubscribe for mixed workloads
max_tasks=0, # No worker recycling for maximum performance
context=multiprocessing.get_context('spawn') # Clean process creation
)
# Reliable pool with frequent worker recycling
def create_reliable_pool():
return ProcessPool(
max_workers=multiprocessing.cpu_count(),
max_tasks=10, # Recycle workers frequently to prevent memory leaks
context=multiprocessing.get_context('spawn')
)
# Memory-conscious pool
def create_memory_conscious_pool():
return ProcessPool(
max_workers=max(1, multiprocessing.cpu_count() // 2), # Fewer workers
max_tasks=5, # Frequent recycling to free memory
context=multiprocessing.get_context('spawn')
)
def benchmark_task(iterations):
"""Benchmark task for testing pool performance"""
import math
total = 0
for i in range(iterations):
total += math.sin(i) * math.cos(i)
return total
# Benchmark different pool configurations
configurations = {
'high_performance': create_high_performance_pool(),
'reliable': create_reliable_pool(),
'memory_conscious': create_memory_conscious_pool()
}
for config_name, pool in configurations.items():
print(f"\nTesting {config_name} configuration:")
start_time = time.time()
with pool:
# Submit benchmark tasks
futures = [
pool.schedule(benchmark_task, args=(100000,))
for _ in range(20)
]
# Wait for completion
results = [f.result() for f in futures]
end_time = time.time()
print(f" Completed {len(results)} tasks in {end_time - start_time:.2f} seconds")
print(f" Average result: {sum(results) / len(results):.2f}")Install with Tessl CLI
npx tessl i tessl/pypi-pebble