or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdmonitoring-control.mdpool-management.mdtask-management.md
tile.json

tessl/pypi-multitasking

Non-blocking Python methods using decorators

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/multitasking@0.0.x

To install, run

npx @tessl/cli install tessl/pypi-multitasking@0.0.0

index.mddocs/

MultiTasking

A lightweight Python library that enables developers to convert synchronous Python methods into asynchronous, non-blocking methods using simple decorators. It offers thread and process-based execution engines with automatic pool management, semaphore-controlled concurrency limiting, and comprehensive task monitoring capabilities.

Package Information

  • Package Name: multitasking
  • Language: Python
  • Installation: pip install multitasking

Core Imports

import multitasking

For specific functionality:

from multitasking import task, wait_for_tasks, set_max_threads, createPool

Basic Usage

import multitasking
import time

# Convert any function to asynchronous using the @task decorator
@multitasking.task
def fetch_data(url_id):
    # Simulate API call or I/O operation
    time.sleep(2)
    print(f"Fetched data from URL {url_id}")
    return f"Data from {url_id}"

# These calls run concurrently, not sequentially
for i in range(5):
    fetch_data(i)

# Wait for all tasks to complete
multitasking.wait_for_tasks()
print("All tasks completed!")

Architecture

MultiTasking uses a pool-based architecture for managing concurrent task execution:

  • Global Configuration: Central config dictionary tracks pools, tasks, and settings
  • Execution Pools: Named pools with semaphore-controlled concurrency limits
  • Task Decorator: @task decorator converts functions to async execution
  • Engine Selection: Choose between threading (I/O-bound) or multiprocessing (CPU-bound)
  • Lifecycle Management: Automatic task tracking and graceful shutdown capabilities

Capabilities

Task Decoration and Execution

Core functionality for converting synchronous functions to asynchronous tasks using the main @task decorator.

def task(callee: Callable[..., Any]) -> Callable[..., Optional[Union[Thread, Process]]]:
    """
    Decorator that converts a function into an asynchronous task.
    
    Returns:
        Decorated function that returns Thread/Process object or None
    """

Task Management

Pool Configuration and Management

Create and configure execution pools with specific thread counts and engine types for different workloads.

def createPool(name: str = "main", threads: Optional[int] = None, engine: Optional[str] = None) -> None:
    """Create a new execution pool with specified configuration."""

def getPool(name: Optional[str] = None) -> Dict[str, Union[str, int]]:
    """Retrieve information about an execution pool."""

Pool Management

Global Configuration

Configure the maximum concurrent threads and execution engine for all new pools and tasks.

def set_max_threads(threads: Optional[int] = None) -> None:
    """Configure the maximum number of concurrent threads/processes."""

def set_engine(kind: str = "") -> None:
    """Configure the execution engine for new pools."""

Configuration

Task Monitoring and Control

Monitor active tasks, wait for completion, and control execution lifecycle.

def get_active_tasks() -> List[Union[Thread, Process]]:
    """Retrieve only the currently running tasks."""

def wait_for_tasks(sleep: float = 0) -> bool:
    """Block until all background tasks complete execution."""

def killall(self: Any = None, cls: Any = None) -> None:
    """Emergency shutdown function that terminates the entire program."""

Monitoring and Control

Types

class PoolConfig(TypedDict):
    """Type definition for execution pool configuration."""
    pool: Optional[Semaphore]  # Controls concurrent task execution
    engine: Union[type[Thread], type[Process]]  # Execution engine
    name: str  # Human-readable pool identifier
    threads: int  # Maximum concurrent tasks (0 = unlimited)

class Config(TypedDict):
    """Type definition for global multitasking configuration."""
    CPU_CORES: int  # Number of CPU cores detected
    ENGINE: str  # Default engine type ("thread" or "process")
    MAX_THREADS: int  # Global maximum thread/process count
    KILL_RECEIVED: bool  # Signal to stop accepting new tasks
    TASKS: List[Union[Thread, Process]]  # All created tasks
    POOLS: Dict[str, PoolConfig]  # Named execution pools
    POOL_NAME: str  # Currently active pool name

# Global configuration instance
config: Config

Constants

__version__: str = "0.0.12"