Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Managed pools of worker threads for executing multiple tasks concurrently. Thread pools are ideal for I/O-bound tasks and provide fine-grained control over worker lifecycle, task scheduling, and resource management. They offer better resource utilization and management compared to creating individual threads.
A managed pool of worker threads that can execute multiple tasks concurrently with automatic worker lifecycle management, task queuing, and optional worker restart capabilities.
class ThreadPool:
def __init__(
self,
max_workers: int = multiprocessing.cpu_count(),
max_tasks: int = 0,
initializer: Callable = None,
initargs: list = ()
):
"""
Create a thread pool for concurrent task execution.
Parameters:
- max_workers: Maximum number of worker threads (defaults to CPU count)
- max_tasks: Maximum tasks per worker before restart (0 = no limit)
- initializer: Function called when each worker thread starts
- initargs: Arguments passed to initializer function
"""from pebble import ThreadPool
import time
# Create pool with default settings
pool = ThreadPool()
# Create pool with custom configuration
pool = ThreadPool(max_workers=4, max_tasks=10)
def io_task(duration, message):
time.sleep(duration)
return f"Completed: {message}"
# Schedule tasks
future1 = pool.schedule(io_task, args=(1, "Task 1"))
future2 = pool.schedule(io_task, args=(2, "Task 2"))
# Get results
result1 = future1.result()
result2 = future2.result()
print(f"Results: {result1}, {result2}")
# Always clean up
pool.close()
pool.join()Schedule individual tasks for execution by worker threads:
def schedule(
self,
function: Callable,
args: tuple = (),
kwargs: dict = {}
) -> concurrent.futures.Future:
"""
Schedule a function for execution in the thread pool.
Parameters:
- function: The function to execute
- args: Positional arguments to pass to function
- kwargs: Keyword arguments to pass to function
Returns:
concurrent.futures.Future object for retrieving the result
"""
def submit(
self,
function: Callable,
*args,
**kwargs
) -> concurrent.futures.Future:
"""
Submit a function for execution (compatibility with concurrent.futures).
Parameters:
- function: The function to execute
- args: Positional arguments to pass to function
- kwargs: Keyword arguments to pass to function
Returns:
concurrent.futures.Future object for retrieving the result
"""from pebble import ThreadPool
import requests
import time
def fetch_url(url, timeout=10):
response = requests.get(url, timeout=timeout)
return {"url": url, "status": response.status_code, "size": len(response.content)}
def process_data(data, multiplier=1):
# Simulate data processing
time.sleep(0.1)
return [x * multiplier for x in data]
# Create and use thread pool
with ThreadPool(max_workers=8) as pool:
# Schedule multiple HTTP requests
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/json",
"https://httpbin.org/user-agent"
]
# Using schedule method
fetch_futures = []
for url in urls:
future = pool.schedule(fetch_url, args=(url,), kwargs={"timeout": 5})
fetch_futures.append(future)
# Using submit method (concurrent.futures style)
data_futures = []
datasets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
for dataset in datasets:
future = pool.submit(process_data, dataset, multiplier=2)
data_futures.append(future)
# Collect results
fetch_results = [f.result() for f in fetch_futures]
data_results = [f.result() for f in data_futures]
print("Fetch results:", fetch_results)
print("Data results:", data_results)Execute a function across multiple inputs using the map interface:
def map(
self,
function: Callable,
*iterables,
chunksize: int = None,
timeout: float = None
) -> MapFuture:
"""
Apply function to every item of iterables in parallel.
Parameters:
- function: Function to apply to each item
- iterables: One or more iterables to process
- chunksize: Number of items per chunk (None for automatic sizing)
- timeout: Maximum time to wait for all results
Returns:
MapFuture object that yields results as they become available
"""from pebble import ThreadPool
import math
import time
def compute_sqrt(x):
time.sleep(0.01) # Simulate some work
return math.sqrt(x)
def fetch_user_data(user_id):
# Simulate API call
time.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}", "active": user_id % 2 == 0}
# Using map for batch processing
with ThreadPool(max_workers=6) as pool:
# Process numbers
numbers = range(100)
sqrt_results = pool.map(compute_sqrt, numbers, chunksize=10)
# Process as results become available
print("Square roots:")
for i, result in enumerate(sqrt_results):
print(f"sqrt({i}) = {result:.3f}")
# Fetch user data for multiple users
user_ids = range(1, 21)
user_futures = pool.map(fetch_user_data, user_ids, timeout=30)
# Get all results at once
users = list(user_futures)
active_users = [user for user in users if user["active"]]
print(f"Active users: {len(active_users)}/{len(users)}")Initialize worker threads with shared resources or configuration:
from pebble import ThreadPool
import logging
import threading
# Setup function for each worker thread
def worker_init(db_config, log_level):
# Configure logging for this thread
logging.basicConfig(level=log_level)
logger = logging.getLogger(f"worker-{threading.current_thread().ident}")
# Initialize database connection (stored in thread-local storage)
thread_local = threading.local()
thread_local.db_connection = create_db_connection(db_config)
thread_local.logger = logger
logger.info("Worker thread initialized")
def create_db_connection(config):
# Simulate database connection
return {"host": config["host"], "connected": True}
def process_record(record_id):
# Access thread-local resources
thread_local = threading.local()
if hasattr(thread_local, 'logger'):
thread_local.logger.info(f"Processing record {record_id}")
# Simulate work with database
time.sleep(0.1)
return f"Processed {record_id}"
# Create pool with worker initialization
db_config = {"host": "localhost", "port": 5432}
pool = ThreadPool(
max_workers=4,
initializer=worker_init,
initargs=(db_config, logging.INFO)
)
try:
# Schedule work
records = range(1, 11)
futures = [pool.schedule(process_record, args=(record_id,)) for record_id in records]
# Get results
results = [f.result() for f in futures]
print("Results:", results)
finally:
pool.close()
pool.join()Properly manage pool resources and handle shutdown:
def close(self):
"""
Prevent new tasks from being submitted to the pool.
Currently running tasks will continue to completion.
"""
def stop(self):
"""
Stop the pool immediately, cancelling pending tasks.
Running tasks may be interrupted.
"""
def join(self, timeout: float = None):
"""
Wait for all worker threads to complete.
Parameters:
- timeout: Maximum time to wait in seconds (None for no timeout)
"""from pebble import ThreadPool
import time
import signal
import sys
def long_running_task(task_id, duration):
print(f"Task {task_id} starting (duration: {duration}s)")
time.sleep(duration)
print(f"Task {task_id} completed")
return f"Result {task_id}"
# Graceful shutdown example
def graceful_shutdown_demo():
pool = ThreadPool(max_workers=3)
try:
# Schedule some long-running tasks
futures = []
for i in range(5):
future = pool.schedule(long_running_task, args=(i, 2))
futures.append(future)
# Simulate running for a while
time.sleep(3)
print("Initiating graceful shutdown...")
pool.close() # No new tasks accepted
# Wait for completion with timeout
pool.join(timeout=10)
# Collect results from completed tasks
for i, future in enumerate(futures):
try:
result = future.result(timeout=0) # Don't wait
print(f"Task {i} result: {result}")
except Exception as e:
print(f"Task {i} failed or incomplete: {e}")
except KeyboardInterrupt:
print("Interrupt received, stopping pool...")
pool.stop() # Force stop
pool.join(timeout=5)
# Context manager usage (recommended)
def context_manager_demo():
with ThreadPool(max_workers=4) as pool:
# Pool is automatically closed and joined when exiting context
futures = []
for i in range(10):
future = pool.schedule(long_running_task, args=(i, 1))
futures.append(future)
# Get results
results = [f.result() for f in futures]
print("All tasks completed:", len(results))
print("Pool automatically cleaned up")
# Signal handling for clean shutdown
def signal_handler(signum, frame):
print(f"Signal {signum} received, shutting down...")
# Handle pool cleanup here
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Run examples
graceful_shutdown_demo()
context_manager_demo()Configure pools for specific use cases and performance requirements:
from pebble import ThreadPool
import time
# High-throughput pool with worker recycling
high_throughput_pool = ThreadPool(
max_workers=20, # Many workers for high concurrency
max_tasks=100 # Recycle workers every 100 tasks
)
# Resource-constrained pool
constrained_pool = ThreadPool(
max_workers=2, # Limited workers for resource constraints
max_tasks=0 # No worker recycling
)
# Specialized pool with custom initialization
def init_worker_with_cache():
import threading
thread_local = threading.local()
thread_local.cache = {}
thread_local.request_count = 0
def cached_operation(key, value):
import threading
thread_local = threading.local()
if not hasattr(thread_local, 'cache'):
thread_local.cache = {}
thread_local.request_count = 0
thread_local.request_count += 1
if key in thread_local.cache:
return thread_local.cache[key]
# Simulate expensive operation
time.sleep(0.1)
result = value * 2
thread_local.cache[key] = result
return result
# Use specialized pool
specialized_pool = ThreadPool(
max_workers=4,
initializer=init_worker_with_cache
)
try:
# Test caching behavior
test_data = [(f"key_{i % 5}", i) for i in range(20)] # Repeated keys
futures = [
specialized_pool.schedule(cached_operation, args=(key, value))
for key, value in test_data
]
results = [f.result() for f in futures]
print(f"Processed {len(results)} items with caching")
finally:
specialized_pool.close()
specialized_pool.join()Install with Tessl CLI
npx tessl i tessl/pypi-pebble