Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Synchronous decorators for thread and process-based execution that return concurrent.futures.Future objects. These decorators provide the simplest way to execute functions concurrently with minimal code changes.
Executes the decorated function in a separate thread and returns a concurrent.futures.Future object. Ideal for I/O-bound tasks that can benefit from concurrent execution without the overhead of process creation.
def thread(
func: Callable = None,
*,
name: Optional[str] = None,
daemon: bool = True,
pool: Optional[ThreadPool] = None
) -> Callable[..., Future]:
"""
Decorator for thread-based concurrent execution.
Parameters:
- func: Function to decorate (when used without parameters)
- name: Thread name for identification and debugging
- daemon: Whether thread runs as daemon (doesn't prevent program exit)
- pool: Existing ThreadPool instance to use instead of creating new thread
Returns:
Decorated function that returns concurrent.futures.Future when called
"""from pebble.concurrent import thread
import time
# Simple usage without parameters
@thread
def io_task(url):
# Simulate I/O operation
time.sleep(1)
return f"Data from {url}"
# Usage with parameters
@thread(name="background-worker", daemon=False)
def background_task(data):
# Process data in background
return len(data) * 2
# Using with existing pool
from pebble import ThreadPool
pool = ThreadPool(max_workers=4)
@thread(pool=pool)
def pooled_task(x):
return x ** 2
# Calling decorated functions
future1 = io_task("https://api.example.com")
future2 = background_task([1, 2, 3, 4])
future3 = pooled_task(5)
# Get results
result1 = future1.result() # Blocks until complete
result2 = future2.result(timeout=10) # With timeout
result3 = future3.result()
print(f"Results: {result1}, {result2}, {result3}")Executes the decorated function in a separate process and returns a ProcessFuture object. Perfect for CPU-intensive tasks that can benefit from true parallelism by bypassing Python's Global Interpreter Lock (GIL).
def process(
func: Callable = None,
*,
name: Optional[str] = None,
daemon: bool = True,
timeout: Optional[float] = None,
mp_context: Optional[multiprocessing.context.BaseContext] = None,
pool: Optional[ProcessPool] = None
) -> Callable[..., ProcessFuture]:
"""
Decorator for process-based concurrent execution.
Parameters:
- func: Function to decorate (when used without parameters)
- name: Process name for identification and debugging
- daemon: Whether process runs as daemon (doesn't prevent program exit)
- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
- mp_context: Multiprocessing context for process creation (spawn, fork, forkserver)
- pool: Existing ProcessPool instance to use instead of creating new process
Returns:
Decorated function that returns ProcessFuture when called
"""from pebble.concurrent import process
import multiprocessing
import time
# Simple usage for CPU-intensive task
@process
def cpu_intensive(n):
total = 0
for i in range(n):
total += i ** 2
return total
# Usage with timeout
@process(timeout=5.0)
def timed_task(duration):
time.sleep(duration)
return "Completed"
# Usage with custom multiprocessing context
ctx = multiprocessing.get_context('spawn')
@process(mp_context=ctx, name="spawned-worker")
def spawned_task(data):
return sum(data)
# Using with existing pool
from pebble import ProcessPool
pool = ProcessPool(max_workers=2)
@process(pool=pool)
def pooled_cpu_task(x, y):
return x * y + (x ** y)
# Calling decorated functions
future1 = cpu_intensive(1000000)
future2 = timed_task(2) # Will complete within timeout
future3 = spawned_task([1, 2, 3, 4, 5])
future4 = pooled_cpu_task(3, 4)
try:
# Get results
result1 = future1.result()
result2 = future2.result()
result3 = future3.result()
result4 = future4.result()
print(f"Results: {result1}, {result2}, {result3}, {result4}")
except TimeoutError:
print("Task exceeded timeout")
except Exception as e:
print(f"Task failed: {e}")Both decorators preserve exception tracebacks and provide comprehensive error information:
from pebble.concurrent import process
from pebble import ProcessExpired
@process(timeout=1.0)
def failing_task():
raise ValueError("Something went wrong")
@process
def timeout_task():
import time
time.sleep(10) # Will timeout if timeout is set
return "Never reached"
# Handle various error conditions
future1 = failing_task()
future2 = timeout_task()
try:
result1 = future1.result()
except ValueError as e:
print(f"Function error: {e}")
try:
result2 = future2.result(timeout=2.0)
except TimeoutError:
print("Task timed out")
except ProcessExpired as e:
print(f"Process died unexpectedly: {e}")Functions decorated with concurrent decorators must be:
# Good: Pure function, easily serializable
@process
def calculate_fibonacci(n):
if n <= 1:
return n
return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)
# Avoid: Depends on global state
global_counter = 0
@process # This could cause issues
def increment_global():
global global_counter
global_counter += 1
return global_counterInstall with Tessl CLI
npx tessl i tessl/pypi-pebble