CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pebble

Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

concurrent-decorators.mddocs/

Concurrent Decorators

Synchronous decorators for thread and process-based execution that return concurrent.futures.Future objects. These decorators provide the simplest way to execute functions concurrently with minimal code changes.

Capabilities

Thread Decorator

Executes the decorated function in a separate thread and returns a concurrent.futures.Future object. Ideal for I/O-bound tasks that can benefit from concurrent execution without the overhead of process creation.

def thread(
    func: Callable = None,
    *,
    name: Optional[str] = None,
    daemon: bool = True,
    pool: Optional[ThreadPool] = None
) -> Callable[..., Future]:
    """
    Decorator for thread-based concurrent execution.
    
    Parameters:
    - func: Function to decorate (when used without parameters)
    - name: Thread name for identification and debugging
    - daemon: Whether thread runs as daemon (doesn't prevent program exit)
    - pool: Existing ThreadPool instance to use instead of creating new thread
    
    Returns:
    Decorated function that returns concurrent.futures.Future when called
    """

Usage Examples

from pebble.concurrent import thread
import time

# Simple usage without parameters
@thread
def io_task(url):
    # Simulate I/O operation
    time.sleep(1)
    return f"Data from {url}"

# Usage with parameters
@thread(name="background-worker", daemon=False)
def background_task(data):
    # Process data in background
    return len(data) * 2

# Using with existing pool
from pebble import ThreadPool

pool = ThreadPool(max_workers=4)

@thread(pool=pool)
def pooled_task(x):
    return x ** 2

# Calling decorated functions
future1 = io_task("https://api.example.com")
future2 = background_task([1, 2, 3, 4])
future3 = pooled_task(5)

# Get results
result1 = future1.result()  # Blocks until complete
result2 = future2.result(timeout=10)  # With timeout
result3 = future3.result()

print(f"Results: {result1}, {result2}, {result3}")

Process Decorator

Executes the decorated function in a separate process and returns a ProcessFuture object. Perfect for CPU-intensive tasks that can benefit from true parallelism by bypassing Python's Global Interpreter Lock (GIL).

def process(
    func: Callable = None,
    *,
    name: Optional[str] = None,
    daemon: bool = True,
    timeout: Optional[float] = None,
    mp_context: Optional[multiprocessing.context.BaseContext] = None,
    pool: Optional[ProcessPool] = None
) -> Callable[..., ProcessFuture]:
    """
    Decorator for process-based concurrent execution.
    
    Parameters:
    - func: Function to decorate (when used without parameters)
    - name: Process name for identification and debugging
    - daemon: Whether process runs as daemon (doesn't prevent program exit)
    - timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
    - mp_context: Multiprocessing context for process creation (spawn, fork, forkserver)
    - pool: Existing ProcessPool instance to use instead of creating new process
    
    Returns:
    Decorated function that returns ProcessFuture when called
    """

Usage Examples

from pebble.concurrent import process
import multiprocessing
import time

# Simple usage for CPU-intensive task
@process
def cpu_intensive(n):
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# Usage with timeout
@process(timeout=5.0)
def timed_task(duration):
    time.sleep(duration)
    return "Completed"

# Usage with custom multiprocessing context
ctx = multiprocessing.get_context('spawn')

@process(mp_context=ctx, name="spawned-worker")
def spawned_task(data):
    return sum(data)

# Using with existing pool
from pebble import ProcessPool

pool = ProcessPool(max_workers=2)

@process(pool=pool)
def pooled_cpu_task(x, y):
    return x * y + (x ** y)

# Calling decorated functions
future1 = cpu_intensive(1000000)
future2 = timed_task(2)  # Will complete within timeout
future3 = spawned_task([1, 2, 3, 4, 5])
future4 = pooled_cpu_task(3, 4)

try:
    # Get results
    result1 = future1.result()
    result2 = future2.result()
    result3 = future3.result()
    result4 = future4.result()
    print(f"Results: {result1}, {result2}, {result3}, {result4}")
except TimeoutError:
    print("Task exceeded timeout")
except Exception as e:
    print(f"Task failed: {e}")

Error Handling

Both decorators preserve exception tracebacks and provide comprehensive error information:

from pebble.concurrent import process
from pebble import ProcessExpired

@process(timeout=1.0)
def failing_task():
    raise ValueError("Something went wrong")

@process
def timeout_task():
    import time
    time.sleep(10)  # Will timeout if timeout is set
    return "Never reached"

# Handle various error conditions
future1 = failing_task()
future2 = timeout_task()

try:
    result1 = future1.result()
except ValueError as e:
    print(f"Function error: {e}")

try:
    result2 = future2.result(timeout=2.0)
except TimeoutError:
    print("Task timed out")
except ProcessExpired as e:
    print(f"Process died unexpectedly: {e}")

Function Requirements

Functions decorated with concurrent decorators must be:

  1. Serializable: Pickleable for process decorator (not required for thread decorator)
  2. Pure or thread-safe: Should not depend on global state that could cause race conditions
  3. Import-accessible: For process decorator, function must be importable from the main module
# Good: Pure function, easily serializable
@process
def calculate_fibonacci(n):
    if n <= 1:
        return n
    return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)

# Avoid: Depends on global state
global_counter = 0

@process  # This could cause issues
def increment_global():
    global global_counter
    global_counter += 1
    return global_counter

Install with Tessl CLI

npx tessl i tessl/pypi-pebble

docs

asynchronous-decorators.md

concurrent-decorators.md

future-types-exceptions.md

index.md

process-pools.md

synchronization-utilities.md

thread-pools.md

tile.json