CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-multitasking

Non-blocking Python methods using decorators

Pending
Overview
Eval results
Files

monitoring-control.mddocs/

Monitoring and Control

Functions for monitoring task execution, waiting for completion, and controlling the execution lifecycle. These provide essential synchronization and shutdown capabilities.

Capabilities

Task Synchronization

Wait for all background tasks to complete before continuing program execution.

def wait_for_tasks(sleep: float = 0) -> bool:
    """
    Block until all background tasks complete execution.
    
    This is the primary synchronization mechanism. It prevents new
    tasks from being created and waits for existing ones to finish.
    Essential for ensuring all work is done before program exit.
    
    Args:
        sleep: Seconds to sleep between checks. 0 means busy-wait.
               Higher values reduce CPU usage but increase latency.
    
    Returns:
        Always returns True when all tasks are complete
    
    Note:
        Sets KILL_RECEIVED=True during execution to prevent new tasks,
        then resets it to False when done.
    """

Usage Example:

import multitasking
import time

@multitasking.task
def background_work(task_id, duration):
    print(f"Task {task_id} starting...")
    time.sleep(duration)
    print(f"Task {task_id} completed!")

# Start multiple tasks
for i in range(5):
    background_work(i, i + 1)

print("All tasks started, waiting for completion...")

# Wait for all tasks (with CPU-friendly polling)
multitasking.wait_for_tasks(sleep=0.1)

print("All tasks completed, continuing program...")

Active Task Monitoring

Monitor the number and status of currently running tasks.

def get_active_tasks() -> List[Union[Thread, Process]]:
    """
    Retrieve only the currently running tasks.
    
    Filters the complete task list to show only tasks that are still
    executing. This is more useful than get_list_of_tasks() for
    monitoring current system load.
    
    Returns:
        List of Thread/Process objects that are still running
    """

Progress Monitoring Example:

import multitasking
import time

@multitasking.task
def data_processing(batch_id, size):
    print(f"Processing batch {batch_id} ({size} items)...")
    time.sleep(size * 0.5)  # Simulate variable processing time
    print(f"Batch {batch_id} complete")

# Start tasks with different processing times
for i in range(10):
    data_processing(i, i + 1)

# Monitor progress with detailed reporting
start_time = time.time()
while multitasking.get_active_tasks():
    active = multitasking.get_active_tasks()
    total = multitasking.get_list_of_tasks()
    completed = len(total) - len(active)
    
    progress = (completed / len(total)) * 100
    elapsed = time.time() - start_time
    
    print(f"Progress: {completed}/{len(total)} ({progress:.1f}%) - "
          f"Active: {len(active)} - Elapsed: {elapsed:.1f}s")
    
    time.sleep(1)

print("All processing complete!")

Emergency Shutdown

Immediately terminate the program, typically used for emergency situations.

def killall(self: Any = None, cls: Any = None) -> None:
    """
    Emergency shutdown function that terminates the entire program.
    
    This is a last-resort function that immediately exits the program,
    potentially leaving tasks in an inconsistent state. It tries
    sys.exit() first, then os._exit() as a final measure.
    
    Args:
        self: Unused parameter kept for backward compatibility
        cls: Unused parameter kept for backward compatibility
    
    Warning:
        This function does NOT wait for tasks to complete cleanly.
        Use wait_for_tasks() for graceful shutdown instead.
    """

Signal Handling Example:

import multitasking
import signal
import time

# Option 1: Emergency shutdown (immediate termination)
signal.signal(signal.SIGINT, multitasking.killall)

# Option 2: Graceful shutdown (recommended)
def graceful_shutdown(signum, frame):
    print("\\nShutting down gracefully...")
    multitasking.wait_for_tasks()
    print("All tasks completed. Exiting.")
    exit(0)

signal.signal(signal.SIGINT, graceful_shutdown)

@multitasking.task
def long_running_task(task_id):
    for i in range(10):
        print(f"Task {task_id}: step {i}/10")
        time.sleep(1)

# Start some long-running tasks
for i in range(3):
    long_running_task(i)

print("Tasks started. Press Ctrl+C to test shutdown...")
multitasking.wait_for_tasks()

Advanced Monitoring Patterns

Task Lifecycle Tracking

Monitor task creation, execution, and completion:

import multitasking
import time
from threading import Lock

# Thread-safe counters
stats_lock = Lock()
task_stats = {"created": 0, "completed": 0, "failed": 0}

def update_stats(key):
    with stats_lock:
        task_stats[key] += 1

@multitasking.task
def monitored_task(task_id):
    update_stats("created")
    try:
        # Simulate work
        time.sleep(1)
        print(f"Task {task_id} succeeded")
        update_stats("completed")
    except Exception as e:
        print(f"Task {task_id} failed: {e}")
        update_stats("failed")

# Start tasks and monitor
for i in range(20):
    monitored_task(i)

# Real-time monitoring
while multitasking.get_active_tasks():
    with stats_lock:
        print(f"Created: {task_stats['created']}, "
              f"Completed: {task_stats['completed']}, "
              f"Failed: {task_stats['failed']}, "
              f"Active: {len(multitasking.get_active_tasks())}")
    time.sleep(0.5)

print(f"Final stats: {task_stats}")

Resource Usage Monitoring

Track system resource usage during task execution:

import multitasking
import psutil
import time

def monitor_resources():
    """Monitor CPU and memory usage during task execution."""
    print(f"CPU: {psutil.cpu_percent():.1f}%, "
          f"Memory: {psutil.virtual_memory().percent:.1f}%, "
          f"Active tasks: {len(multitasking.get_active_tasks())}")

@multitasking.task
def cpu_intensive_task(task_id):
    # Simulate CPU-intensive work
    total = 0
    for i in range(1000000):
        total += i * i
    print(f"Task {task_id} result: {total}")

# Start monitoring
@multitasking.task
def resource_monitor():
    while multitasking.get_active_tasks():
        monitor_resources()
        time.sleep(2)

# Start the monitor
resource_monitor()

# Start CPU-intensive tasks
for i in range(5):
    cpu_intensive_task(i)

multitasking.wait_for_tasks()
print("All tasks and monitoring complete!")

Install with Tessl CLI

npx tessl i tessl/pypi-multitasking

docs

configuration.md

index.md

monitoring-control.md

pool-management.md

task-management.md

tile.json