Non-blocking Python methods using decorators
npx @tessl/cli install tessl/pypi-multitasking@0.0.0A lightweight Python library that enables developers to convert synchronous Python methods into asynchronous, non-blocking methods using simple decorators. It offers thread and process-based execution engines with automatic pool management, semaphore-controlled concurrency limiting, and comprehensive task monitoring capabilities.
pip install multitaskingimport multitaskingFor specific functionality:
from multitasking import task, wait_for_tasks, set_max_threads, createPoolimport multitasking
import time
# Convert any function to asynchronous using the @task decorator
@multitasking.task
def fetch_data(url_id):
# Simulate API call or I/O operation
time.sleep(2)
print(f"Fetched data from URL {url_id}")
return f"Data from {url_id}"
# These calls run concurrently, not sequentially
for i in range(5):
fetch_data(i)
# Wait for all tasks to complete
multitasking.wait_for_tasks()
print("All tasks completed!")MultiTasking uses a pool-based architecture for managing concurrent task execution:
config dictionary tracks pools, tasks, and settings@task decorator converts functions to async executionCore functionality for converting synchronous functions to asynchronous tasks using the main @task decorator.
def task(callee: Callable[..., Any]) -> Callable[..., Optional[Union[Thread, Process]]]:
"""
Decorator that converts a function into an asynchronous task.
Returns:
Decorated function that returns Thread/Process object or None
"""Create and configure execution pools with specific thread counts and engine types for different workloads.
def createPool(name: str = "main", threads: Optional[int] = None, engine: Optional[str] = None) -> None:
"""Create a new execution pool with specified configuration."""
def getPool(name: Optional[str] = None) -> Dict[str, Union[str, int]]:
"""Retrieve information about an execution pool."""Configure the maximum concurrent threads and execution engine for all new pools and tasks.
def set_max_threads(threads: Optional[int] = None) -> None:
"""Configure the maximum number of concurrent threads/processes."""
def set_engine(kind: str = "") -> None:
"""Configure the execution engine for new pools."""Monitor active tasks, wait for completion, and control execution lifecycle.
def get_active_tasks() -> List[Union[Thread, Process]]:
"""Retrieve only the currently running tasks."""
def wait_for_tasks(sleep: float = 0) -> bool:
"""Block until all background tasks complete execution."""
def killall(self: Any = None, cls: Any = None) -> None:
"""Emergency shutdown function that terminates the entire program."""class PoolConfig(TypedDict):
"""Type definition for execution pool configuration."""
pool: Optional[Semaphore] # Controls concurrent task execution
engine: Union[type[Thread], type[Process]] # Execution engine
name: str # Human-readable pool identifier
threads: int # Maximum concurrent tasks (0 = unlimited)
class Config(TypedDict):
"""Type definition for global multitasking configuration."""
CPU_CORES: int # Number of CPU cores detected
ENGINE: str # Default engine type ("thread" or "process")
MAX_THREADS: int # Global maximum thread/process count
KILL_RECEIVED: bool # Signal to stop accepting new tasks
TASKS: List[Union[Thread, Process]] # All created tasks
POOLS: Dict[str, PoolConfig] # Named execution pools
POOL_NAME: str # Currently active pool name
# Global configuration instance
config: Config__version__: str = "0.0.12"