CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pebble

Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

asynchronous-decorators.mddocs/

Asynchronous Decorators

AsyncIO-compatible decorators for thread and process-based execution that return asyncio.Future objects. Perfect for integration with async/await patterns and AsyncIO applications, allowing seamless mixing of concurrent execution with asynchronous programming.

Capabilities

AsyncIO Thread Decorator

Executes the decorated function in a separate thread and returns an asyncio.Future object. Ideal for I/O-bound tasks in AsyncIO applications where you need to call synchronous functions without blocking the event loop.

def thread(
    func: Callable = None,
    *,
    name: Optional[str] = None,
    daemon: bool = True,
    pool: Optional[ThreadPool] = None
) -> Callable[..., asyncio.Future]:
    """
    AsyncIO decorator for thread-based concurrent execution.
    
    Parameters:
    - func: Function to decorate (when used without parameters)
    - name: Thread name for identification and debugging
    - daemon: Whether thread runs as daemon (doesn't prevent program exit)
    - pool: Existing ThreadPool instance to use instead of creating new thread
    
    Returns:
    Decorated function that returns asyncio.Future when called
    """

Usage Examples

import asyncio
from pebble.asynchronous import thread
import time
import requests

# Simple usage for blocking I/O
@thread
def fetch_data(url):
    # Synchronous I/O that would block event loop
    response = requests.get(url)
    return response.json()

# Usage with parameters
@thread(name="file-processor", daemon=False)
def process_file(filename):
    with open(filename, 'r') as f:
        return len(f.read())

# Using with existing pool
from pebble import ThreadPool

pool = ThreadPool(max_workers=4)

@thread(pool=pool)
def cpu_task(n):
    return sum(i ** 2 for i in range(n))

# AsyncIO application
async def main():
    # Schedule multiple concurrent operations
    tasks = [
        fetch_data("https://api.example.com/data1"),
        fetch_data("https://api.example.com/data2"),
        process_file("large_file.txt"),
        cpu_task(1000)
    ]
    
    # Wait for all to complete
    results = await asyncio.gather(*tasks)
    print(f"Results: {results}")
    
    # Or process as they complete
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Completed: {result}")

# Run the AsyncIO application
asyncio.run(main())

AsyncIO Process Decorator

Executes the decorated function in a separate process and returns an asyncio.Future object. Perfect for CPU-intensive tasks in AsyncIO applications that need true parallelism without blocking the event loop.

def process(
    func: Callable = None,
    *,
    name: Optional[str] = None,
    daemon: bool = True,
    timeout: Optional[float] = None,
    mp_context: Optional[multiprocessing.context.BaseContext] = None,
    pool: Optional[ProcessPool] = None
) -> Callable[..., asyncio.Future]:
    """
    AsyncIO decorator for process-based concurrent execution.
    
    Parameters:
    - func: Function to decorate (when used without parameters)
    - name: Process name for identification and debugging  
    - daemon: Whether process runs as daemon (doesn't prevent program exit)
    - timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
    - mp_context: Multiprocessing context for process creation
    - pool: Existing ProcessPool instance to use instead of creating new process
    
    Returns:
    Decorated function that returns asyncio.Future when called
    """

Usage Examples

import asyncio
import multiprocessing
from pebble.asynchronous import process

# CPU-intensive task
@process
def heavy_computation(data):
    # Simulate heavy CPU work
    result = 0
    for item in data:
        result += item ** 3
    return result

# With timeout for long-running tasks
@process(timeout=30.0)
def data_analysis(dataset):
    # Simulate data analysis that might take a while
    import time
    time.sleep(5)  # Simulate processing
    return {"mean": sum(dataset) / len(dataset), "size": len(dataset)}

# Using custom multiprocessing context
ctx = multiprocessing.get_context('spawn')

@process(mp_context=ctx, name="isolated-worker")
def isolated_task(config):
    # Task that needs process isolation
    return config["value"] * 2

# AsyncIO application with CPU-intensive tasks
async def data_pipeline():
    # Generate data
    datasets = [
        list(range(1000 * i, 1000 * (i + 1)))
        for i in range(5)
    ]
    
    # Process datasets concurrently
    computation_tasks = [
        heavy_computation(dataset)
        for dataset in datasets
    ]
    
    analysis_tasks = [
        data_analysis(dataset)
        for dataset in datasets
    ]
    
    # Wait for computations
    print("Starting heavy computations...")
    computation_results = await asyncio.gather(*computation_tasks)
    
    # Wait for analysis
    print("Starting data analysis...")
    analysis_results = await asyncio.gather(*analysis_tasks)
    
    print(f"Computation results: {computation_results}")
    print(f"Analysis results: {analysis_results}")
    
    # Mix with other async operations
    isolated_result = await isolated_task({"value": 42})
    print(f"Isolated result: {isolated_result}")

# Run the pipeline
asyncio.run(data_pipeline())

Integration with AsyncIO Patterns

The asynchronous decorators integrate seamlessly with AsyncIO patterns and utilities:

import asyncio
from pebble.asynchronous import thread, process

@thread
def sync_io_operation(url):
    import requests
    return requests.get(url).json()

@process
def cpu_bound_task(n):
    return sum(i * i for i in range(n))

async def advanced_patterns():
    # Using asyncio.wait with timeout
    tasks = [
        sync_io_operation("https://api1.example.com"),
        sync_io_operation("https://api2.example.com"),
        cpu_bound_task(1000000)
    ]
    
    done, pending = await asyncio.wait(
        tasks, 
        timeout=10.0,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # Cancel pending tasks
    for task in pending:
        task.cancel()
    
    # Process completed tasks
    for task in done:
        try:
            result = await task
            print(f"Completed: {result}")
        except Exception as e:
            print(f"Failed: {e}")

# Error handling with AsyncIO
async def error_handling_example():
    @process(timeout=2.0)
    def might_timeout():
        import time
        time.sleep(5)  # Will timeout
        return "Done"
    
    @thread
    def might_fail():
        raise ValueError("Something went wrong")
    
    try:
        result1 = await might_timeout()
    except asyncio.TimeoutError:
        print("Process timed out")
    
    try:
        result2 = await might_fail()
    except ValueError as e:
        print(f"Thread failed: {e}")

asyncio.run(advanced_patterns())
asyncio.run(error_handling_example())

AsyncIO Context Management

Using asynchronous decorators with AsyncIO context managers and resource management:

import asyncio
from pebble.asynchronous import thread
from contextlib import asynccontextmanager

@thread
def database_query(query):
    # Simulate database query
    import time
    time.sleep(1)
    return f"Result for: {query}"

@asynccontextmanager
async def database_session():
    print("Opening database session")
    try:
        yield "session"
    finally:
        print("Closing database session")

async def database_operations():
    async with database_session() as session:
        # Execute multiple queries concurrently
        queries = [
            database_query("SELECT * FROM users"),
            database_query("SELECT * FROM orders"),
            database_query("SELECT * FROM products")
        ]
        
        results = await asyncio.gather(*queries)
        return results

# Resource cleanup with asyncio
async def resource_management():
    tasks = []
    
    try:
        # Start multiple background tasks
        for i in range(5):
            task = database_query(f"Query {i}")
            tasks.append(task)
        
        # Wait for all with timeout
        results = await asyncio.wait_for(
            asyncio.gather(*tasks),
            timeout=10.0
        )
        
        return results
        
    except asyncio.TimeoutError:
        print("Operations timed out, cleaning up...")
        # Tasks are automatically cancelled by wait_for
        return None

asyncio.run(database_operations())
asyncio.run(resource_management())

Install with Tessl CLI

npx tessl i tessl/pypi-pebble

docs

asynchronous-decorators.md

concurrent-decorators.md

future-types-exceptions.md

index.md

process-pools.md

synchronization-utilities.md

thread-pools.md

tile.json