CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pebble

Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

Pebble

Threading and multiprocessing eye-candy providing a clean, decorator-based API for concurrent execution. Pebble offers thread and process pools with advanced features like timeouts, worker restart capabilities, and error handling with traceback preservation.

Package Information

  • Package Name: pebble
  • Language: Python
  • Installation: pip install pebble

Core Imports

import pebble

Common imports for decorators:

from pebble.concurrent import thread, process
from pebble.asynchronous import thread, process

Common imports for pools:

from pebble import ThreadPool, ProcessPool

Common imports for utilities:

from pebble import synchronized, waitforthreads, waitforqueues

Basic Usage

from pebble.concurrent import process
from pebble import ProcessPool
import time

# Using decorators for simple concurrent execution
@process
def cpu_intensive_task(n):
    # Simulate CPU-intensive work
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# Call returns a Future
future = cpu_intensive_task(1000000)
result = future.result()  # Blocks until complete
print(f"Result: {result}")

# Using pools for managing multiple workers
with ProcessPool() as pool:
    # Schedule multiple tasks
    futures = []
    for i in range(5):
        future = pool.schedule(cpu_intensive_task, args=[100000 * (i + 1)])
        futures.append(future)
    
    # Collect results
    results = [f.result() for f in futures]
    print(f"Results: {results}")

Architecture

Pebble's architecture centers around two execution models:

  • Decorator-based execution: Simple decorators that wrap functions for concurrent execution, returning Future objects
  • Pool-based execution: Managed worker pools that provide fine-grained control over resource allocation and lifecycle

Key components:

  • Concurrent decorators: Synchronous decorators returning concurrent.futures.Future
  • Asynchronous decorators: AsyncIO-compatible decorators returning asyncio.Future
  • Pool classes: Managed pools of workers with scheduling, timeout, and lifecycle management
  • Future types: Enhanced Future objects with process-specific capabilities
  • Utility functions: Helper functions for synchronization and waiting operations

Capabilities

Concurrent Decorators

Synchronous decorators for thread and process-based execution that return concurrent.futures.Future objects. These decorators provide the simplest way to execute functions concurrently with minimal code changes.

@thread(name=None, daemon=True, pool=None)
def decorated_function(): ...

@process(name=None, daemon=True, timeout=None, mp_context=None, pool=None)
def decorated_function(): ...

Concurrent Decorators

Asynchronous Decorators

AsyncIO-compatible decorators for thread and process-based execution that return asyncio.Future objects. Perfect for integration with async/await patterns and AsyncIO applications.

@thread(name=None, daemon=True, pool=None)
async def decorated_function(): ...

@process(name=None, daemon=True, timeout=None, mp_context=None, pool=None)
async def decorated_function(): ...

Asynchronous Decorators

Thread Pools

Managed pools of worker threads for executing multiple tasks concurrently. Thread pools are ideal for I/O-bound tasks and provide fine-grained control over worker lifecycle and task scheduling.

class ThreadPool:
    def __init__(self, max_workers=multiprocessing.cpu_count(), max_tasks=0, initializer=None, initargs=[]): ...
    def schedule(self, function, args=(), kwargs={}): ...
    def submit(self, function, *args, **kwargs): ...
    def map(self, function, *iterables, **kwargs): ...
    def close(self): ...
    def stop(self): ...
    def join(self): ...

Thread Pools

Process Pools

Managed pools of worker processes for executing CPU-intensive tasks. Process pools bypass Python's GIL and provide true parallelism with advanced features like timeouts and automatic worker restart.

class ProcessPool:
    def __init__(self, max_workers=multiprocessing.cpu_count(), max_tasks=0, initializer=None, initargs=[], context=multiprocessing): ...
    def schedule(self, function, args=(), kwargs={}, timeout=None): ...
    def submit(self, function, timeout, /, *args, **kwargs): ...
    def map(self, function, *iterables, **kwargs): ...
    def close(self): ...
    def stop(self): ...
    def join(self): ...

Process Pools

Synchronization Utilities

Utility functions and decorators for thread synchronization, signal handling, and waiting operations. These tools help coordinate concurrent execution and handle edge cases.

@synchronized(lock=None)
def decorated_function(): ...

@sighandler(signals)
def signal_handler(): ...

def waitforthreads(threads, timeout=None): ...
def waitforqueues(queues, timeout=None): ...

Synchronization Utilities

Future Types and Exceptions

Enhanced Future objects and exception types for handling concurrent execution results, timeouts, and error conditions specific to Pebble's execution model.

class ProcessFuture(concurrent.futures.Future):
    def cancel(self): ...
    def result(self, timeout=None): ...
    def exception(self, timeout=None): ...

class ProcessExpired(OSError):
    def __init__(self, msg, code=0, pid=None): ...

class MapFuture(concurrent.futures.Future): ...
class ProcessMapFuture(concurrent.futures.Future): ...

Future Types and Exceptions

Types

# Core execution types
from typing import Callable, Optional, Any, List, Dict
from concurrent.futures import Future
from multiprocessing.context import BaseContext
from threading import Lock, RLock, Semaphore
import signal

# Function signatures for decorators
CallableType = Callable[..., Any]
ThreadDecoratorReturnType = Callable[..., Future]
ProcessDecoratorReturnType = Callable[..., 'ProcessFuture']
AsyncIODecoratorReturnType = Callable[..., 'asyncio.Future']

# Parameter types
ThreadDecoratorParams = {
    'name': Optional[str],
    'daemon': bool,
    'pool': Optional['ThreadPool']
}

ProcessDecoratorParams = {
    'name': Optional[str], 
    'daemon': bool,
    'timeout': Optional[float],
    'mp_context': Optional[BaseContext],
    'pool': Optional['ProcessPool']
}

# Synchronization types
SynchronizationLock = Lock | RLock | Semaphore
SignalType = int | List[int]

Install with Tessl CLI

npx tessl i tessl/pypi-pebble
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pebble@5.1.x
Publish Source
CLI
Badge
tessl/pypi-pebble badge