Non-blocking Python methods using decorators
—
Functions for monitoring task execution, waiting for completion, and controlling the execution lifecycle. These provide essential synchronization and shutdown capabilities.
Wait for all background tasks to complete before continuing program execution.
def wait_for_tasks(sleep: float = 0) -> bool:
"""
Block until all background tasks complete execution.
This is the primary synchronization mechanism. It prevents new
tasks from being created and waits for existing ones to finish.
Essential for ensuring all work is done before program exit.
Args:
sleep: Seconds to sleep between checks. 0 means busy-wait.
Higher values reduce CPU usage but increase latency.
Returns:
Always returns True when all tasks are complete
Note:
Sets KILL_RECEIVED=True during execution to prevent new tasks,
then resets it to False when done.
"""Usage Example:
import multitasking
import time
@multitasking.task
def background_work(task_id, duration):
print(f"Task {task_id} starting...")
time.sleep(duration)
print(f"Task {task_id} completed!")
# Start multiple tasks
for i in range(5):
background_work(i, i + 1)
print("All tasks started, waiting for completion...")
# Wait for all tasks (with CPU-friendly polling)
multitasking.wait_for_tasks(sleep=0.1)
print("All tasks completed, continuing program...")Monitor the number and status of currently running tasks.
def get_active_tasks() -> List[Union[Thread, Process]]:
"""
Retrieve only the currently running tasks.
Filters the complete task list to show only tasks that are still
executing. This is more useful than get_list_of_tasks() for
monitoring current system load.
Returns:
List of Thread/Process objects that are still running
"""Progress Monitoring Example:
import multitasking
import time
@multitasking.task
def data_processing(batch_id, size):
print(f"Processing batch {batch_id} ({size} items)...")
time.sleep(size * 0.5) # Simulate variable processing time
print(f"Batch {batch_id} complete")
# Start tasks with different processing times
for i in range(10):
data_processing(i, i + 1)
# Monitor progress with detailed reporting
start_time = time.time()
while multitasking.get_active_tasks():
active = multitasking.get_active_tasks()
total = multitasking.get_list_of_tasks()
completed = len(total) - len(active)
progress = (completed / len(total)) * 100
elapsed = time.time() - start_time
print(f"Progress: {completed}/{len(total)} ({progress:.1f}%) - "
f"Active: {len(active)} - Elapsed: {elapsed:.1f}s")
time.sleep(1)
print("All processing complete!")Immediately terminate the program, typically used for emergency situations.
def killall(self: Any = None, cls: Any = None) -> None:
"""
Emergency shutdown function that terminates the entire program.
This is a last-resort function that immediately exits the program,
potentially leaving tasks in an inconsistent state. It tries
sys.exit() first, then os._exit() as a final measure.
Args:
self: Unused parameter kept for backward compatibility
cls: Unused parameter kept for backward compatibility
Warning:
This function does NOT wait for tasks to complete cleanly.
Use wait_for_tasks() for graceful shutdown instead.
"""Signal Handling Example:
import multitasking
import signal
import time
# Option 1: Emergency shutdown (immediate termination)
signal.signal(signal.SIGINT, multitasking.killall)
# Option 2: Graceful shutdown (recommended)
def graceful_shutdown(signum, frame):
print("\\nShutting down gracefully...")
multitasking.wait_for_tasks()
print("All tasks completed. Exiting.")
exit(0)
signal.signal(signal.SIGINT, graceful_shutdown)
@multitasking.task
def long_running_task(task_id):
for i in range(10):
print(f"Task {task_id}: step {i}/10")
time.sleep(1)
# Start some long-running tasks
for i in range(3):
long_running_task(i)
print("Tasks started. Press Ctrl+C to test shutdown...")
multitasking.wait_for_tasks()Monitor task creation, execution, and completion:
import multitasking
import time
from threading import Lock
# Thread-safe counters
stats_lock = Lock()
task_stats = {"created": 0, "completed": 0, "failed": 0}
def update_stats(key):
with stats_lock:
task_stats[key] += 1
@multitasking.task
def monitored_task(task_id):
update_stats("created")
try:
# Simulate work
time.sleep(1)
print(f"Task {task_id} succeeded")
update_stats("completed")
except Exception as e:
print(f"Task {task_id} failed: {e}")
update_stats("failed")
# Start tasks and monitor
for i in range(20):
monitored_task(i)
# Real-time monitoring
while multitasking.get_active_tasks():
with stats_lock:
print(f"Created: {task_stats['created']}, "
f"Completed: {task_stats['completed']}, "
f"Failed: {task_stats['failed']}, "
f"Active: {len(multitasking.get_active_tasks())}")
time.sleep(0.5)
print(f"Final stats: {task_stats}")Track system resource usage during task execution:
import multitasking
import psutil
import time
def monitor_resources():
"""Monitor CPU and memory usage during task execution."""
print(f"CPU: {psutil.cpu_percent():.1f}%, "
f"Memory: {psutil.virtual_memory().percent:.1f}%, "
f"Active tasks: {len(multitasking.get_active_tasks())}")
@multitasking.task
def cpu_intensive_task(task_id):
# Simulate CPU-intensive work
total = 0
for i in range(1000000):
total += i * i
print(f"Task {task_id} result: {total}")
# Start monitoring
@multitasking.task
def resource_monitor():
while multitasking.get_active_tasks():
monitor_resources()
time.sleep(2)
# Start the monitor
resource_monitor()
# Start CPU-intensive tasks
for i in range(5):
cpu_intensive_task(i)
multitasking.wait_for_tasks()
print("All tasks and monitoring complete!")Install with Tessl CLI
npx tessl i tessl/pypi-multitasking