or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asynchronous-decorators.mdconcurrent-decorators.mdfuture-types-exceptions.mdindex.mdprocess-pools.mdsynchronization-utilities.mdthread-pools.md
tile.json

tessl/pypi-pebble

Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pebble@5.1.x

To install, run

npx @tessl/cli install tessl/pypi-pebble@5.1.0

index.mddocs/

Pebble

Threading and multiprocessing eye-candy providing a clean, decorator-based API for concurrent execution. Pebble offers thread and process pools with advanced features like timeouts, worker restart capabilities, and error handling with traceback preservation.

Package Information

  • Package Name: pebble
  • Language: Python
  • Installation: pip install pebble

Core Imports

import pebble

Common imports for decorators:

from pebble.concurrent import thread, process
from pebble.asynchronous import thread, process

Common imports for pools:

from pebble import ThreadPool, ProcessPool

Common imports for utilities:

from pebble import synchronized, waitforthreads, waitforqueues

Basic Usage

from pebble.concurrent import process
from pebble import ProcessPool
import time

# Using decorators for simple concurrent execution
@process
def cpu_intensive_task(n):
    # Simulate CPU-intensive work
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# Call returns a Future
future = cpu_intensive_task(1000000)
result = future.result()  # Blocks until complete
print(f"Result: {result}")

# Using pools for managing multiple workers
with ProcessPool() as pool:
    # Schedule multiple tasks
    futures = []
    for i in range(5):
        future = pool.schedule(cpu_intensive_task, args=[100000 * (i + 1)])
        futures.append(future)
    
    # Collect results
    results = [f.result() for f in futures]
    print(f"Results: {results}")

Architecture

Pebble's architecture centers around two execution models:

  • Decorator-based execution: Simple decorators that wrap functions for concurrent execution, returning Future objects
  • Pool-based execution: Managed worker pools that provide fine-grained control over resource allocation and lifecycle

Key components:

  • Concurrent decorators: Synchronous decorators returning concurrent.futures.Future
  • Asynchronous decorators: AsyncIO-compatible decorators returning asyncio.Future
  • Pool classes: Managed pools of workers with scheduling, timeout, and lifecycle management
  • Future types: Enhanced Future objects with process-specific capabilities
  • Utility functions: Helper functions for synchronization and waiting operations

Capabilities

Concurrent Decorators

Synchronous decorators for thread and process-based execution that return concurrent.futures.Future objects. These decorators provide the simplest way to execute functions concurrently with minimal code changes.

@thread(name=None, daemon=True, pool=None)
def decorated_function(): ...

@process(name=None, daemon=True, timeout=None, mp_context=None, pool=None)
def decorated_function(): ...

Concurrent Decorators

Asynchronous Decorators

AsyncIO-compatible decorators for thread and process-based execution that return asyncio.Future objects. Perfect for integration with async/await patterns and AsyncIO applications.

@thread(name=None, daemon=True, pool=None)
async def decorated_function(): ...

@process(name=None, daemon=True, timeout=None, mp_context=None, pool=None)
async def decorated_function(): ...

Asynchronous Decorators

Thread Pools

Managed pools of worker threads for executing multiple tasks concurrently. Thread pools are ideal for I/O-bound tasks and provide fine-grained control over worker lifecycle and task scheduling.

class ThreadPool:
    def __init__(self, max_workers=multiprocessing.cpu_count(), max_tasks=0, initializer=None, initargs=[]): ...
    def schedule(self, function, args=(), kwargs={}): ...
    def submit(self, function, *args, **kwargs): ...
    def map(self, function, *iterables, **kwargs): ...
    def close(self): ...
    def stop(self): ...
    def join(self): ...

Thread Pools

Process Pools

Managed pools of worker processes for executing CPU-intensive tasks. Process pools bypass Python's GIL and provide true parallelism with advanced features like timeouts and automatic worker restart.

class ProcessPool:
    def __init__(self, max_workers=multiprocessing.cpu_count(), max_tasks=0, initializer=None, initargs=[], context=multiprocessing): ...
    def schedule(self, function, args=(), kwargs={}, timeout=None): ...
    def submit(self, function, timeout, /, *args, **kwargs): ...
    def map(self, function, *iterables, **kwargs): ...
    def close(self): ...
    def stop(self): ...
    def join(self): ...

Process Pools

Synchronization Utilities

Utility functions and decorators for thread synchronization, signal handling, and waiting operations. These tools help coordinate concurrent execution and handle edge cases.

@synchronized(lock=None)
def decorated_function(): ...

@sighandler(signals)
def signal_handler(): ...

def waitforthreads(threads, timeout=None): ...
def waitforqueues(queues, timeout=None): ...

Synchronization Utilities

Future Types and Exceptions

Enhanced Future objects and exception types for handling concurrent execution results, timeouts, and error conditions specific to Pebble's execution model.

class ProcessFuture(concurrent.futures.Future):
    def cancel(self): ...
    def result(self, timeout=None): ...
    def exception(self, timeout=None): ...

class ProcessExpired(OSError):
    def __init__(self, msg, code=0, pid=None): ...

class MapFuture(concurrent.futures.Future): ...
class ProcessMapFuture(concurrent.futures.Future): ...

Future Types and Exceptions

Types

# Core execution types
from typing import Callable, Optional, Any, List, Dict
from concurrent.futures import Future
from multiprocessing.context import BaseContext
from threading import Lock, RLock, Semaphore
import signal

# Function signatures for decorators
CallableType = Callable[..., Any]
ThreadDecoratorReturnType = Callable[..., Future]
ProcessDecoratorReturnType = Callable[..., 'ProcessFuture']
AsyncIODecoratorReturnType = Callable[..., 'asyncio.Future']

# Parameter types
ThreadDecoratorParams = {
    'name': Optional[str],
    'daemon': bool,
    'pool': Optional['ThreadPool']
}

ProcessDecoratorParams = {
    'name': Optional[str], 
    'daemon': bool,
    'timeout': Optional[float],
    'mp_context': Optional[BaseContext],
    'pool': Optional['ProcessPool']
}

# Synchronization types
SynchronizationLock = Lock | RLock | Semaphore
SignalType = int | List[int]