Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
AsyncIO-compatible decorators for thread and process-based execution that return asyncio.Future objects. Perfect for integration with async/await patterns and AsyncIO applications, allowing seamless mixing of concurrent execution with asynchronous programming.
Executes the decorated function in a separate thread and returns an asyncio.Future object. Ideal for I/O-bound tasks in AsyncIO applications where you need to call synchronous functions without blocking the event loop.
def thread(
func: Callable = None,
*,
name: Optional[str] = None,
daemon: bool = True,
pool: Optional[ThreadPool] = None
) -> Callable[..., asyncio.Future]:
"""
AsyncIO decorator for thread-based concurrent execution.
Parameters:
- func: Function to decorate (when used without parameters)
- name: Thread name for identification and debugging
- daemon: Whether thread runs as daemon (doesn't prevent program exit)
- pool: Existing ThreadPool instance to use instead of creating new thread
Returns:
Decorated function that returns asyncio.Future when called
"""import asyncio
from pebble.asynchronous import thread
import time
import requests
# Simple usage for blocking I/O
@thread
def fetch_data(url):
# Synchronous I/O that would block event loop
response = requests.get(url)
return response.json()
# Usage with parameters
@thread(name="file-processor", daemon=False)
def process_file(filename):
with open(filename, 'r') as f:
return len(f.read())
# Using with existing pool
from pebble import ThreadPool
pool = ThreadPool(max_workers=4)
@thread(pool=pool)
def cpu_task(n):
return sum(i ** 2 for i in range(n))
# AsyncIO application
async def main():
# Schedule multiple concurrent operations
tasks = [
fetch_data("https://api.example.com/data1"),
fetch_data("https://api.example.com/data2"),
process_file("large_file.txt"),
cpu_task(1000)
]
# Wait for all to complete
results = await asyncio.gather(*tasks)
print(f"Results: {results}")
# Or process as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Completed: {result}")
# Run the AsyncIO application
asyncio.run(main())Executes the decorated function in a separate process and returns an asyncio.Future object. Perfect for CPU-intensive tasks in AsyncIO applications that need true parallelism without blocking the event loop.
def process(
func: Callable = None,
*,
name: Optional[str] = None,
daemon: bool = True,
timeout: Optional[float] = None,
mp_context: Optional[multiprocessing.context.BaseContext] = None,
pool: Optional[ProcessPool] = None
) -> Callable[..., asyncio.Future]:
"""
AsyncIO decorator for process-based concurrent execution.
Parameters:
- func: Function to decorate (when used without parameters)
- name: Process name for identification and debugging
- daemon: Whether process runs as daemon (doesn't prevent program exit)
- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
- mp_context: Multiprocessing context for process creation
- pool: Existing ProcessPool instance to use instead of creating new process
Returns:
Decorated function that returns asyncio.Future when called
"""import asyncio
import multiprocessing
from pebble.asynchronous import process
# CPU-intensive task
@process
def heavy_computation(data):
# Simulate heavy CPU work
result = 0
for item in data:
result += item ** 3
return result
# With timeout for long-running tasks
@process(timeout=30.0)
def data_analysis(dataset):
# Simulate data analysis that might take a while
import time
time.sleep(5) # Simulate processing
return {"mean": sum(dataset) / len(dataset), "size": len(dataset)}
# Using custom multiprocessing context
ctx = multiprocessing.get_context('spawn')
@process(mp_context=ctx, name="isolated-worker")
def isolated_task(config):
# Task that needs process isolation
return config["value"] * 2
# AsyncIO application with CPU-intensive tasks
async def data_pipeline():
# Generate data
datasets = [
list(range(1000 * i, 1000 * (i + 1)))
for i in range(5)
]
# Process datasets concurrently
computation_tasks = [
heavy_computation(dataset)
for dataset in datasets
]
analysis_tasks = [
data_analysis(dataset)
for dataset in datasets
]
# Wait for computations
print("Starting heavy computations...")
computation_results = await asyncio.gather(*computation_tasks)
# Wait for analysis
print("Starting data analysis...")
analysis_results = await asyncio.gather(*analysis_tasks)
print(f"Computation results: {computation_results}")
print(f"Analysis results: {analysis_results}")
# Mix with other async operations
isolated_result = await isolated_task({"value": 42})
print(f"Isolated result: {isolated_result}")
# Run the pipeline
asyncio.run(data_pipeline())The asynchronous decorators integrate seamlessly with AsyncIO patterns and utilities:
import asyncio
from pebble.asynchronous import thread, process
@thread
def sync_io_operation(url):
import requests
return requests.get(url).json()
@process
def cpu_bound_task(n):
return sum(i * i for i in range(n))
async def advanced_patterns():
# Using asyncio.wait with timeout
tasks = [
sync_io_operation("https://api1.example.com"),
sync_io_operation("https://api2.example.com"),
cpu_bound_task(1000000)
]
done, pending = await asyncio.wait(
tasks,
timeout=10.0,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending tasks
for task in pending:
task.cancel()
# Process completed tasks
for task in done:
try:
result = await task
print(f"Completed: {result}")
except Exception as e:
print(f"Failed: {e}")
# Error handling with AsyncIO
async def error_handling_example():
@process(timeout=2.0)
def might_timeout():
import time
time.sleep(5) # Will timeout
return "Done"
@thread
def might_fail():
raise ValueError("Something went wrong")
try:
result1 = await might_timeout()
except asyncio.TimeoutError:
print("Process timed out")
try:
result2 = await might_fail()
except ValueError as e:
print(f"Thread failed: {e}")
asyncio.run(advanced_patterns())
asyncio.run(error_handling_example())Using asynchronous decorators with AsyncIO context managers and resource management:
import asyncio
from pebble.asynchronous import thread
from contextlib import asynccontextmanager
@thread
def database_query(query):
# Simulate database query
import time
time.sleep(1)
return f"Result for: {query}"
@asynccontextmanager
async def database_session():
print("Opening database session")
try:
yield "session"
finally:
print("Closing database session")
async def database_operations():
async with database_session() as session:
# Execute multiple queries concurrently
queries = [
database_query("SELECT * FROM users"),
database_query("SELECT * FROM orders"),
database_query("SELECT * FROM products")
]
results = await asyncio.gather(*queries)
return results
# Resource cleanup with asyncio
async def resource_management():
tasks = []
try:
# Start multiple background tasks
for i in range(5):
task = database_query(f"Query {i}")
tasks.append(task)
# Wait for all with timeout
results = await asyncio.wait_for(
asyncio.gather(*tasks),
timeout=10.0
)
return results
except asyncio.TimeoutError:
print("Operations timed out, cleaning up...")
# Tasks are automatically cancelled by wait_for
return None
asyncio.run(database_operations())
asyncio.run(resource_management())Install with Tessl CLI
npx tessl i tessl/pypi-pebble