Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Utility functions and decorators for thread synchronization, signal handling, and waiting operations. These tools help coordinate concurrent execution, handle edge cases, and provide fine-grained control over synchronization in multi-threaded and multi-process environments.
A decorator that prevents concurrent execution of decorated functions using locks, preventing race conditions and ensuring thread-safe access to shared resources.
def synchronized(lock=None):
"""
Decorator that synchronizes function execution using a lock.
Parameters:
- lock: Optional Lock, RLock, or Semaphore object for synchronization.
If None, uses a shared Lock for all @synchronized functions.
Returns:
Decorated function that executes atomically
"""from pebble import synchronized
import threading
import time
# Shared resource
counter = 0
shared_data = []
# Using default shared lock
@synchronized
def increment_counter():
global counter
current = counter
time.sleep(0.001) # Simulate some work
counter = current + 1
@synchronized
def append_data(value):
shared_data.append(value)
time.sleep(0.001)
# Using custom lock
custom_lock = threading.RLock()
@synchronized(custom_lock)
def complex_operation(value):
shared_data.append(f"start-{value}")
time.sleep(0.01)
shared_data.append(f"end-{value}")
# Test synchronization
def test_synchronization():
threads = []
# Test counter increment
for i in range(100):
thread = threading.Thread(target=increment_counter)
threads.append(thread)
# Test data appending
for i in range(50):
thread = threading.Thread(target=append_data, args=(i,))
threads.append(thread)
# Test complex operation
for i in range(10):
thread = threading.Thread(target=complex_operation, args=(i,))
threads.append(thread)
# Start all threads
for thread in threads:
thread.start()
# Wait for completion
for thread in threads:
thread.join()
print(f"Final counter value: {counter}")
print(f"Shared data length: {len(shared_data)}")
print(f"Complex operations: {[item for item in shared_data if 'start' in item]}")
test_synchronization()from pebble import synchronized
import threading
import time
import queue
# Different types of locks for different use cases
read_write_lock = threading.RLock() # Allows recursive locking
resource_semaphore = threading.Semaphore(3) # Limit concurrent access
condition_lock = threading.Condition()
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
@synchronized # Uses shared lock
def get_global_count(self):
# This method shares a lock with all other @synchronized methods
return self._value
@synchronized(lambda self: self._lock) # Uses instance lock
def increment(self):
self._value += 1
@synchronized(lambda self: self._lock)
def decrement(self):
self._value -= 1
@synchronized(lambda self: self._lock)
def get_value(self):
return self._value
# Resource pool with semaphore
@synchronized(resource_semaphore)
def use_limited_resource(resource_id, duration):
print(f"Using resource {resource_id}")
time.sleep(duration)
print(f"Released resource {resource_id}")
return f"Result from resource {resource_id}"
# Producer-consumer with condition
shared_queue = queue.Queue()
@synchronized(condition_lock)
def producer(items):
for item in items:
shared_queue.put(item)
print(f"Produced: {item}")
condition_lock.notify_all() # Wake up consumers
time.sleep(0.1)
@synchronized(condition_lock)
def consumer(consumer_id, timeout=5):
while True:
try:
item = shared_queue.get(timeout=timeout)
print(f"Consumer {consumer_id} consumed: {item}")
shared_queue.task_done()
except queue.Empty:
print(f"Consumer {consumer_id} timed out")
break
# Test advanced patterns
def test_advanced_patterns():
# Test thread-safe counter
counter = ThreadSafeCounter()
def worker():
for _ in range(100):
counter.increment()
if counter.get_value() > 50:
counter.decrement()
threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Final counter value: {counter.get_value()}")
# Test resource pool
resource_threads = []
for i in range(10):
thread = threading.Thread(
target=use_limited_resource,
args=(i, 1.0)
)
resource_threads.append(thread)
for t in resource_threads:
t.start()
for t in resource_threads:
t.join()
# Test producer-consumer
producer_thread = threading.Thread(
target=producer,
args=(list(range(20)),)
)
consumer_threads = [
threading.Thread(target=consumer, args=(i,))
for i in range(3)
]
producer_thread.start()
for t in consumer_threads:
t.start()
producer_thread.join()
for t in consumer_threads:
t.join()
test_advanced_patterns()A decorator for setting up signal handlers to manage process lifecycle and handle system signals gracefully.
def sighandler(signals):
"""
Decorator that sets the decorated function as a signal handler.
Parameters:
- signals: Single signal or list/tuple of signals to handle
Returns:
Decorated function that will be called when specified signals are received
"""from pebble import sighandler
import signal
import time
import sys
import os
# Global state for signal handling
shutdown_requested = False
received_signals = []
# Handle single signal
@sighandler(signal.SIGINT)
def handle_interrupt(signum, frame):
global shutdown_requested
print(f"\nReceived SIGINT (Ctrl+C). Initiating graceful shutdown...")
shutdown_requested = True
# Handle multiple signals
@sighandler([signal.SIGTERM, signal.SIGUSR1])
def handle_multiple_signals(signum, frame):
global received_signals
signal_names = {
signal.SIGTERM: "SIGTERM",
signal.SIGUSR1: "SIGUSR1"
}
signal_name = signal_names.get(signum, f"Signal {signum}")
print(f"Received {signal_name}")
received_signals.append((signum, time.time()))
if signum == signal.SIGTERM:
print("Termination requested")
sys.exit(0)
elif signum == signal.SIGUSR1:
print("User signal 1 - logging status")
print(f"Received signals so far: {len(received_signals)}")
# Complex signal handler with state management
class SignalAwareWorker:
def __init__(self):
self.running = True
self.tasks_completed = 0
self.setup_signal_handlers()
@sighandler(signal.SIGINT)
def handle_shutdown(self, signum, frame):
print(f"\nShutdown signal received. Completed {self.tasks_completed} tasks.")
self.running = False
@sighandler(signal.SIGUSR2)
def handle_status_request(self, signum, frame):
print(f"Status: Running={self.running}, Tasks completed={self.tasks_completed}")
def setup_signal_handlers(self):
# Signal handlers are already set up by decorators
pass
def work(self):
print("Worker started. Send SIGINT to stop, SIGUSR2 for status.")
while self.running:
# Simulate work
time.sleep(1)
self.tasks_completed += 1
if self.tasks_completed % 5 == 0:
print(f"Completed {self.tasks_completed} tasks...")
print("Worker shutting down gracefully.")
# Signal handling in multiprocessing context
def worker_process():
"""Worker process with signal handling"""
@sighandler(signal.SIGTERM)
def worker_shutdown(signum, frame):
print(f"Worker process {os.getpid()} received SIGTERM")
sys.exit(0)
@sighandler(signal.SIGUSR1)
def worker_status(signum, frame):
print(f"Worker process {os.getpid()} is alive")
print(f"Worker process {os.getpid()} started")
# Simulate work
for i in range(100):
time.sleep(0.5)
if i % 10 == 0:
print(f"Worker {os.getpid()} progress: {i}/100")
# Example usage
def signal_handling_demo():
print("Signal handling demo. Process PID:", os.getpid())
print("Try: kill -SIGUSR1", os.getpid())
print("Or: kill -SIGTERM", os.getpid())
worker = SignalAwareWorker()
try:
worker.work()
except KeyboardInterrupt:
print("Caught KeyboardInterrupt in main")
# Uncomment to run demo (be careful in production environments)
# signal_handling_demo()Functions for waiting on multiple threads to complete, providing fine-grained control over thread synchronization.
def waitforthreads(threads, timeout=None):
"""
Wait for one or more threads to complete.
Parameters:
- threads: List of threading.Thread objects to wait for
- timeout: Maximum time to wait in seconds (None for no timeout)
Returns:
Filter object containing threads that have completed
"""from pebble import waitforthreads
import threading
import time
import random
def worker_task(task_id, duration):
print(f"Task {task_id} starting (duration: {duration}s)")
time.sleep(duration)
print(f"Task {task_id} completed")
return f"Result {task_id}"
def test_thread_waiting():
# Create multiple threads with different durations
threads = []
for i in range(5):
duration = random.uniform(1, 5)
thread = threading.Thread(
target=worker_task,
args=(i, duration)
)
threads.append(thread)
# Start all threads
for thread in threads:
thread.start()
print("All threads started. Waiting for completion...")
# Wait for threads with timeout
completed = waitforthreads(threads, timeout=3.0)
completed_list = list(completed)
print(f"After 3 seconds, {len(completed_list)} threads completed")
# Wait for remaining threads
remaining = [t for t in threads if t.is_alive()]
if remaining:
print(f"Waiting for {len(remaining)} remaining threads...")
final_completed = waitforthreads(remaining, timeout=10.0)
final_completed_list = list(final_completed)
print(f"Finally, {len(final_completed_list)} more threads completed")
# Clean up any remaining threads
for thread in threads:
if thread.is_alive():
print(f"Thread {thread.name} is still running")
thread.join(timeout=1.0)
# Advanced thread waiting patterns
def advanced_thread_waiting():
# Producer threads
producer_threads = []
shared_queue = []
def producer(producer_id, item_count):
for i in range(item_count):
item = f"item-{producer_id}-{i}"
shared_queue.append(item)
time.sleep(0.1)
print(f"Producer {producer_id} finished")
# Consumer threads
consumer_threads = []
def consumer(consumer_id):
while True:
if shared_queue:
item = shared_queue.pop(0)
print(f"Consumer {consumer_id} processed {item}")
time.sleep(0.05)
else:
time.sleep(0.01)
# Start producers
for i in range(3):
thread = threading.Thread(target=producer, args=(i, 10))
producer_threads.append(thread)
thread.start()
# Start consumers
for i in range(2):
thread = threading.Thread(target=consumer, args=(i))
thread.daemon = True # Will exit when main program exits
consumer_threads.append(thread)
thread.start()
# Wait for producers to finish
print("Waiting for producers to finish...")
completed_producers = waitforthreads(producer_threads, timeout=20.0)
completed_count = len(list(completed_producers))
print(f"{completed_count} producers completed")
# Give consumers time to process remaining items
time.sleep(2)
print(f"Final queue size: {len(shared_queue)}")
test_thread_waiting()
advanced_thread_waiting()Functions for waiting on multiple queues to have data available, enabling efficient queue-based coordination.
def waitforqueues(queues, timeout=None):
"""
Wait for one or more queues to have data available.
Parameters:
- queues: List of queue.Queue objects to monitor
- timeout: Maximum time to wait in seconds (None for no timeout)
Returns:
Filter object containing queues that have data available
"""from pebble import waitforqueues
import queue
import threading
import time
import random
def test_queue_waiting():
# Create multiple queues
queues = [queue.Queue() for _ in range(4)]
queue_names = [f"Queue-{i}" for i in range(4)]
def producer(q, name, delay):
time.sleep(delay)
q.put(f"Data from {name}")
print(f"{name} produced data after {delay}s")
# Start producers with different delays
producers = []
for i, (q, name) in enumerate(zip(queues, queue_names)):
delay = random.uniform(1, 5)
producer_thread = threading.Thread(
target=producer,
args=(q, name, delay)
)
producers.append(producer_thread)
producer_thread.start()
print("Waiting for queues to have data...")
# Wait for queues to have data
ready_queues = waitforqueues(queues, timeout=3.0)
ready_list = list(ready_queues)
print(f"After 3 seconds, {len(ready_list)} queues have data")
# Process ready queues
for q in ready_list:
try:
data = q.get_nowait()
print(f"Got data: {data}")
except queue.Empty:
print("Queue became empty")
# Wait for remaining queues
remaining = [q for q in queues if q.empty()]
if remaining:
print(f"Waiting for {len(remaining)} more queues...")
final_ready = waitforqueues(remaining, timeout=10.0)
final_ready_list = list(final_ready)
for q in final_ready_list:
try:
data = q.get_nowait()
print(f"Got final data: {data}")
except queue.Empty:
print("Final queue became empty")
# Clean up
for thread in producers:
thread.join()
# Advanced queue coordination
def queue_coordination_example():
# Different types of queues
priority_queue = queue.PriorityQueue()
lifo_queue = queue.LifoQueue() # Stack
fifo_queue = queue.Queue() # Regular queue
queues = [priority_queue, lifo_queue, fifo_queue]
queue_types = ["Priority", "LIFO", "FIFO"]
def priority_producer():
for priority in [3, 1, 2]: # Lower numbers = higher priority
time.sleep(random.uniform(0.5, 1.5))
priority_queue.put((priority, f"Priority-{priority} item"))
print(f"Added priority {priority} item")
def lifo_producer():
for i in range(3):
time.sleep(random.uniform(0.5, 1.5))
lifo_queue.put(f"LIFO-item-{i}")
print(f"Added LIFO item {i}")
def fifo_producer():
for i in range(3):
time.sleep(random.uniform(0.5, 1.5))
fifo_queue.put(f"FIFO-item-{i}")
print(f"Added FIFO item {i}")
# Start producers
producers = [
threading.Thread(target=priority_producer),
threading.Thread(target=lifo_producer),
threading.Thread(target=fifo_producer)
]
for producer in producers:
producer.start()
# Monitor queues as they get data
processed_items = 0
target_items = 9 # 3 items per queue
while processed_items < target_items:
print("Checking for queue updates...")
ready_queues = waitforqueues(queues, timeout=2.0)
ready_list = list(ready_queues)
if ready_list:
print(f"Found {len(ready_list)} queues with data")
for i, q in enumerate(queues):
if q in ready_list:
queue_type = queue_types[i]
try:
if isinstance(q, queue.PriorityQueue):
priority, item = q.get_nowait()
print(f" {queue_type}: {item} (priority {priority})")
else:
item = q.get_nowait()
print(f" {queue_type}: {item}")
processed_items += 1
except queue.Empty:
pass
else:
print("No queues ready, continuing to wait...")
# Wait for producers to finish
for producer in producers:
producer.join()
print(f"Processed all {processed_items} items")
test_queue_waiting()
queue_coordination_example()Combining synchronization utilities for complex coordination patterns:
from pebble import synchronized, waitforthreads, waitforqueues
import threading
import queue
import time
class CoordinatedWorkerPool:
def __init__(self, worker_count=3):
self.worker_count = worker_count
self.task_queue = queue.Queue()
self.result_queues = [queue.Queue() for _ in range(worker_count)]
self.workers = []
self.active = False
self._lock = threading.Lock()
@synchronized
def start(self):
if self.active:
return
self.active = True
for i in range(self.worker_count):
worker = threading.Thread(
target=self._worker_loop,
args=(i, self.result_queues[i])
)
worker.daemon = True
self.workers.append(worker)
worker.start()
@synchronized
def stop(self):
self.active = False
# Add stop signals to task queue
for _ in range(self.worker_count):
self.task_queue.put(None)
def submit_task(self, task_func, *args, **kwargs):
if not self.active:
raise RuntimeError("Pool not started")
task = (task_func, args, kwargs)
self.task_queue.put(task)
def _worker_loop(self, worker_id, result_queue):
print(f"Worker {worker_id} started")
while self.active:
try:
task = self.task_queue.get(timeout=1.0)
if task is None: # Stop signal
break
task_func, args, kwargs = task
try:
result = task_func(*args, **kwargs)
result_queue.put(('success', result))
except Exception as e:
result_queue.put(('error', str(e)))
self.task_queue.task_done()
except queue.Empty:
continue
print(f"Worker {worker_id} stopped")
def get_results(self, timeout=None):
# Wait for result queues to have data
ready_queues = waitforqueues(self.result_queues, timeout=timeout)
ready_list = list(ready_queues)
results = []
for q in ready_list:
try:
while True:
result_type, result_data = q.get_nowait()
results.append((result_type, result_data))
except queue.Empty:
pass
return results
def wait_for_completion(self, timeout=None):
# Wait for all workers to finish
completed_workers = waitforthreads(self.workers, timeout=timeout)
return list(completed_workers)
# Test coordinated worker pool
def test_coordinated_pool():
def sample_task(task_id, duration):
time.sleep(duration)
return f"Task {task_id} completed in {duration}s"
pool = CoordinatedWorkerPool(worker_count=3)
pool.start()
try:
# Submit tasks
for i in range(10):
pool.submit_task(sample_task, i, random.uniform(0.1, 1.0))
# Monitor results as they come in
total_results = 0
while total_results < 10:
results = pool.get_results(timeout=2.0)
for result_type, result_data in results:
if result_type == 'success':
print(f"Success: {result_data}")
else:
print(f"Error: {result_data}")
total_results += 1
if not results:
print("No results yet, waiting...")
print("All tasks completed")
finally:
pool.stop()
completed = pool.wait_for_completion(timeout=5.0)
print(f"Pool stopped, {len(completed)} workers completed")
test_coordinated_pool()Install with Tessl CLI
npx tessl i tessl/pypi-pebble