CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pebble

Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

synchronization-utilities.mddocs/

Synchronization Utilities

Utility functions and decorators for thread synchronization, signal handling, and waiting operations. These tools help coordinate concurrent execution, handle edge cases, and provide fine-grained control over synchronization in multi-threaded and multi-process environments.

Capabilities

Synchronized Decorator

A decorator that prevents concurrent execution of decorated functions using locks, preventing race conditions and ensuring thread-safe access to shared resources.

def synchronized(lock=None):
    """
    Decorator that synchronizes function execution using a lock.
    
    Parameters:
    - lock: Optional Lock, RLock, or Semaphore object for synchronization.
           If None, uses a shared Lock for all @synchronized functions.
    
    Returns:
    Decorated function that executes atomically
    """

Basic Synchronization

from pebble import synchronized
import threading
import time

# Shared resource
counter = 0
shared_data = []

# Using default shared lock
@synchronized
def increment_counter():
    global counter
    current = counter
    time.sleep(0.001)  # Simulate some work
    counter = current + 1

@synchronized  
def append_data(value):
    shared_data.append(value)
    time.sleep(0.001)

# Using custom lock
custom_lock = threading.RLock()

@synchronized(custom_lock)
def complex_operation(value):
    shared_data.append(f"start-{value}")
    time.sleep(0.01)
    shared_data.append(f"end-{value}")

# Test synchronization
def test_synchronization():
    threads = []
    
    # Test counter increment
    for i in range(100):
        thread = threading.Thread(target=increment_counter)
        threads.append(thread)
    
    # Test data appending
    for i in range(50):
        thread = threading.Thread(target=append_data, args=(i,))
        threads.append(thread)
    
    # Test complex operation
    for i in range(10):
        thread = threading.Thread(target=complex_operation, args=(i,))
        threads.append(thread)
    
    # Start all threads
    for thread in threads:
        thread.start()
    
    # Wait for completion
    for thread in threads:
        thread.join()
    
    print(f"Final counter value: {counter}")
    print(f"Shared data length: {len(shared_data)}")
    print(f"Complex operations: {[item for item in shared_data if 'start' in item]}")

test_synchronization()

Advanced Synchronization Patterns

from pebble import synchronized
import threading
import time
import queue

# Different types of locks for different use cases
read_write_lock = threading.RLock()  # Allows recursive locking
resource_semaphore = threading.Semaphore(3)  # Limit concurrent access
condition_lock = threading.Condition()

class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()
    
    @synchronized  # Uses shared lock
    def get_global_count(self):
        # This method shares a lock with all other @synchronized methods
        return self._value
    
    @synchronized(lambda self: self._lock)  # Uses instance lock
    def increment(self):
        self._value += 1
    
    @synchronized(lambda self: self._lock)
    def decrement(self):
        self._value -= 1
    
    @synchronized(lambda self: self._lock)
    def get_value(self):
        return self._value

# Resource pool with semaphore
@synchronized(resource_semaphore)
def use_limited_resource(resource_id, duration):
    print(f"Using resource {resource_id}")
    time.sleep(duration)
    print(f"Released resource {resource_id}")
    return f"Result from resource {resource_id}"

# Producer-consumer with condition
shared_queue = queue.Queue()

@synchronized(condition_lock)
def producer(items):
    for item in items:
        shared_queue.put(item)
        print(f"Produced: {item}")
        condition_lock.notify_all()  # Wake up consumers
        time.sleep(0.1)

@synchronized(condition_lock)
def consumer(consumer_id, timeout=5):
    while True:
        try:
            item = shared_queue.get(timeout=timeout)
            print(f"Consumer {consumer_id} consumed: {item}")
            shared_queue.task_done()
        except queue.Empty:
            print(f"Consumer {consumer_id} timed out")
            break

# Test advanced patterns
def test_advanced_patterns():
    # Test thread-safe counter
    counter = ThreadSafeCounter()
    
    def worker():
        for _ in range(100):
            counter.increment()
            if counter.get_value() > 50:
                counter.decrement()
    
    threads = [threading.Thread(target=worker) for _ in range(5)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    print(f"Final counter value: {counter.get_value()}")
    
    # Test resource pool
    resource_threads = []
    for i in range(10):
        thread = threading.Thread(
            target=use_limited_resource, 
            args=(i, 1.0)
        )
        resource_threads.append(thread)
    
    for t in resource_threads:
        t.start()
    for t in resource_threads:
        t.join()
    
    # Test producer-consumer
    producer_thread = threading.Thread(
        target=producer, 
        args=(list(range(20)),)
    )
    consumer_threads = [
        threading.Thread(target=consumer, args=(i,))
        for i in range(3)
    ]
    
    producer_thread.start()
    for t in consumer_threads:
        t.start()
    
    producer_thread.join()
    for t in consumer_threads:
        t.join()

test_advanced_patterns()

Signal Handler Decorator

A decorator for setting up signal handlers to manage process lifecycle and handle system signals gracefully.

def sighandler(signals):
    """
    Decorator that sets the decorated function as a signal handler.
    
    Parameters:
    - signals: Single signal or list/tuple of signals to handle
    
    Returns:
    Decorated function that will be called when specified signals are received
    """

Signal Handling Examples

from pebble import sighandler
import signal
import time
import sys
import os

# Global state for signal handling
shutdown_requested = False
received_signals = []

# Handle single signal
@sighandler(signal.SIGINT)
def handle_interrupt(signum, frame):
    global shutdown_requested
    print(f"\nReceived SIGINT (Ctrl+C). Initiating graceful shutdown...")
    shutdown_requested = True

# Handle multiple signals
@sighandler([signal.SIGTERM, signal.SIGUSR1])
def handle_multiple_signals(signum, frame):
    global received_signals
    signal_names = {
        signal.SIGTERM: "SIGTERM",
        signal.SIGUSR1: "SIGUSR1"
    }
    
    signal_name = signal_names.get(signum, f"Signal {signum}")
    print(f"Received {signal_name}")
    received_signals.append((signum, time.time()))
    
    if signum == signal.SIGTERM:
        print("Termination requested")
        sys.exit(0)
    elif signum == signal.SIGUSR1:
        print("User signal 1 - logging status")
        print(f"Received signals so far: {len(received_signals)}")

# Complex signal handler with state management
class SignalAwareWorker:
    def __init__(self):
        self.running = True
        self.tasks_completed = 0
        self.setup_signal_handlers()
    
    @sighandler(signal.SIGINT)
    def handle_shutdown(self, signum, frame):
        print(f"\nShutdown signal received. Completed {self.tasks_completed} tasks.")
        self.running = False
    
    @sighandler(signal.SIGUSR2)
    def handle_status_request(self, signum, frame):
        print(f"Status: Running={self.running}, Tasks completed={self.tasks_completed}")
    
    def setup_signal_handlers(self):
        # Signal handlers are already set up by decorators
        pass
    
    def work(self):
        print("Worker started. Send SIGINT to stop, SIGUSR2 for status.")
        
        while self.running:
            # Simulate work
            time.sleep(1)
            self.tasks_completed += 1
            
            if self.tasks_completed % 5 == 0:
                print(f"Completed {self.tasks_completed} tasks...")
        
        print("Worker shutting down gracefully.")

# Signal handling in multiprocessing context
def worker_process():
    """Worker process with signal handling"""
    
    @sighandler(signal.SIGTERM)
    def worker_shutdown(signum, frame):
        print(f"Worker process {os.getpid()} received SIGTERM")
        sys.exit(0)
    
    @sighandler(signal.SIGUSR1)
    def worker_status(signum, frame):
        print(f"Worker process {os.getpid()} is alive")
    
    print(f"Worker process {os.getpid()} started")
    
    # Simulate work
    for i in range(100):
        time.sleep(0.5)
        if i % 10 == 0:
            print(f"Worker {os.getpid()} progress: {i}/100")

# Example usage
def signal_handling_demo():
    print("Signal handling demo. Process PID:", os.getpid())
    print("Try: kill -SIGUSR1", os.getpid())
    print("Or: kill -SIGTERM", os.getpid())
    
    worker = SignalAwareWorker()
    
    try:
        worker.work()
    except KeyboardInterrupt:
        print("Caught KeyboardInterrupt in main")

# Uncomment to run demo (be careful in production environments)
# signal_handling_demo()

Thread Waiting Functions

Functions for waiting on multiple threads to complete, providing fine-grained control over thread synchronization.

def waitforthreads(threads, timeout=None):
    """
    Wait for one or more threads to complete.
    
    Parameters:
    - threads: List of threading.Thread objects to wait for
    - timeout: Maximum time to wait in seconds (None for no timeout)
    
    Returns:
    Filter object containing threads that have completed
    """

Thread Waiting Examples

from pebble import waitforthreads
import threading
import time
import random

def worker_task(task_id, duration):
    print(f"Task {task_id} starting (duration: {duration}s)")
    time.sleep(duration)
    print(f"Task {task_id} completed")
    return f"Result {task_id}"

def test_thread_waiting():
    # Create multiple threads with different durations
    threads = []
    for i in range(5):
        duration = random.uniform(1, 5)
        thread = threading.Thread(
            target=worker_task,
            args=(i, duration)
        )
        threads.append(thread)
    
    # Start all threads
    for thread in threads:
        thread.start()
    
    print("All threads started. Waiting for completion...")
    
    # Wait for threads with timeout
    completed = waitforthreads(threads, timeout=3.0)
    completed_list = list(completed)
    
    print(f"After 3 seconds, {len(completed_list)} threads completed")
    
    # Wait for remaining threads
    remaining = [t for t in threads if t.is_alive()]
    if remaining:
        print(f"Waiting for {len(remaining)} remaining threads...")
        final_completed = waitforthreads(remaining, timeout=10.0)
        final_completed_list = list(final_completed)
        print(f"Finally, {len(final_completed_list)} more threads completed")
    
    # Clean up any remaining threads
    for thread in threads:
        if thread.is_alive():
            print(f"Thread {thread.name} is still running")
        thread.join(timeout=1.0)

# Advanced thread waiting patterns
def advanced_thread_waiting():
    # Producer threads
    producer_threads = []
    shared_queue = []
    
    def producer(producer_id, item_count):
        for i in range(item_count):
            item = f"item-{producer_id}-{i}"
            shared_queue.append(item)
            time.sleep(0.1)
        print(f"Producer {producer_id} finished")
    
    # Consumer threads  
    consumer_threads = []
    
    def consumer(consumer_id):
        while True:
            if shared_queue:
                item = shared_queue.pop(0)
                print(f"Consumer {consumer_id} processed {item}")
                time.sleep(0.05)
            else:
                time.sleep(0.01)
    
    # Start producers
    for i in range(3):
        thread = threading.Thread(target=producer, args=(i, 10))
        producer_threads.append(thread)
        thread.start()
    
    # Start consumers
    for i in range(2):
        thread = threading.Thread(target=consumer, args=(i))
        thread.daemon = True  # Will exit when main program exits
        consumer_threads.append(thread)
        thread.start()
    
    # Wait for producers to finish
    print("Waiting for producers to finish...")
    completed_producers = waitforthreads(producer_threads, timeout=20.0)
    completed_count = len(list(completed_producers))
    
    print(f"{completed_count} producers completed")
    
    # Give consumers time to process remaining items
    time.sleep(2)
    
    print(f"Final queue size: {len(shared_queue)}")

test_thread_waiting()
advanced_thread_waiting()

Queue Waiting Functions

Functions for waiting on multiple queues to have data available, enabling efficient queue-based coordination.

def waitforqueues(queues, timeout=None):
    """
    Wait for one or more queues to have data available.
    
    Parameters:
    - queues: List of queue.Queue objects to monitor
    - timeout: Maximum time to wait in seconds (None for no timeout)
    
    Returns:
    Filter object containing queues that have data available
    """

Queue Waiting Examples

from pebble import waitforqueues
import queue
import threading
import time
import random

def test_queue_waiting():
    # Create multiple queues
    queues = [queue.Queue() for _ in range(4)]
    queue_names = [f"Queue-{i}" for i in range(4)]
    
    def producer(q, name, delay):
        time.sleep(delay)
        q.put(f"Data from {name}")
        print(f"{name} produced data after {delay}s")
    
    # Start producers with different delays
    producers = []
    for i, (q, name) in enumerate(zip(queues, queue_names)):
        delay = random.uniform(1, 5)
        producer_thread = threading.Thread(
            target=producer,
            args=(q, name, delay)
        )
        producers.append(producer_thread)
        producer_thread.start()
    
    print("Waiting for queues to have data...")
    
    # Wait for queues to have data
    ready_queues = waitforqueues(queues, timeout=3.0)
    ready_list = list(ready_queues)
    
    print(f"After 3 seconds, {len(ready_list)} queues have data")
    
    # Process ready queues
    for q in ready_list:
        try:
            data = q.get_nowait()
            print(f"Got data: {data}")
        except queue.Empty:
            print("Queue became empty")
    
    # Wait for remaining queues
    remaining = [q for q in queues if q.empty()]
    if remaining:
        print(f"Waiting for {len(remaining)} more queues...")
        final_ready = waitforqueues(remaining, timeout=10.0)
        final_ready_list = list(final_ready)
        
        for q in final_ready_list:
            try:
                data = q.get_nowait()
                print(f"Got final data: {data}")
            except queue.Empty:
                print("Final queue became empty")
    
    # Clean up
    for thread in producers:
        thread.join()

# Advanced queue coordination
def queue_coordination_example():
    # Different types of queues
    priority_queue = queue.PriorityQueue()
    lifo_queue = queue.LifoQueue()  # Stack
    fifo_queue = queue.Queue()     # Regular queue
    
    queues = [priority_queue, lifo_queue, fifo_queue]
    queue_types = ["Priority", "LIFO", "FIFO"]
    
    def priority_producer():
        for priority in [3, 1, 2]:  # Lower numbers = higher priority
            time.sleep(random.uniform(0.5, 1.5))
            priority_queue.put((priority, f"Priority-{priority} item"))
            print(f"Added priority {priority} item")
    
    def lifo_producer():
        for i in range(3):
            time.sleep(random.uniform(0.5, 1.5))
            lifo_queue.put(f"LIFO-item-{i}")
            print(f"Added LIFO item {i}")
    
    def fifo_producer():
        for i in range(3):
            time.sleep(random.uniform(0.5, 1.5))
            fifo_queue.put(f"FIFO-item-{i}")
            print(f"Added FIFO item {i}")
    
    # Start producers
    producers = [
        threading.Thread(target=priority_producer),
        threading.Thread(target=lifo_producer),
        threading.Thread(target=fifo_producer)
    ]
    
    for producer in producers:
        producer.start()
    
    # Monitor queues as they get data
    processed_items = 0
    target_items = 9  # 3 items per queue
    
    while processed_items < target_items:
        print("Checking for queue updates...")
        
        ready_queues = waitforqueues(queues, timeout=2.0)
        ready_list = list(ready_queues)
        
        if ready_list:
            print(f"Found {len(ready_list)} queues with data")
            
            for i, q in enumerate(queues):
                if q in ready_list:
                    queue_type = queue_types[i]
                    try:
                        if isinstance(q, queue.PriorityQueue):
                            priority, item = q.get_nowait()
                            print(f"  {queue_type}: {item} (priority {priority})")
                        else:
                            item = q.get_nowait()
                            print(f"  {queue_type}: {item}")
                        processed_items += 1
                    except queue.Empty:
                        pass
        else:
            print("No queues ready, continuing to wait...")
    
    # Wait for producers to finish
    for producer in producers:
        producer.join()
    
    print(f"Processed all {processed_items} items")

test_queue_waiting()
queue_coordination_example()

Utility Combinations

Combining synchronization utilities for complex coordination patterns:

from pebble import synchronized, waitforthreads, waitforqueues
import threading
import queue
import time

class CoordinatedWorkerPool:
    def __init__(self, worker_count=3):
        self.worker_count = worker_count
        self.task_queue = queue.Queue()
        self.result_queues = [queue.Queue() for _ in range(worker_count)]
        self.workers = []
        self.active = False
        self._lock = threading.Lock()
    
    @synchronized
    def start(self):
        if self.active:
            return
        
        self.active = True
        
        for i in range(self.worker_count):
            worker = threading.Thread(
                target=self._worker_loop,
                args=(i, self.result_queues[i])
            )
            worker.daemon = True
            self.workers.append(worker)
            worker.start()
    
    @synchronized  
    def stop(self):
        self.active = False
        
        # Add stop signals to task queue
        for _ in range(self.worker_count):
            self.task_queue.put(None)
    
    def submit_task(self, task_func, *args, **kwargs):
        if not self.active:
            raise RuntimeError("Pool not started")
        
        task = (task_func, args, kwargs)
        self.task_queue.put(task)
    
    def _worker_loop(self, worker_id, result_queue):
        print(f"Worker {worker_id} started")
        
        while self.active:
            try:
                task = self.task_queue.get(timeout=1.0)
                
                if task is None:  # Stop signal
                    break
                
                task_func, args, kwargs = task
                
                try:
                    result = task_func(*args, **kwargs)
                    result_queue.put(('success', result))
                except Exception as e:
                    result_queue.put(('error', str(e)))
                
                self.task_queue.task_done()
                
            except queue.Empty:
                continue
        
        print(f"Worker {worker_id} stopped")
    
    def get_results(self, timeout=None):
        # Wait for result queues to have data
        ready_queues = waitforqueues(self.result_queues, timeout=timeout)
        ready_list = list(ready_queues)
        
        results = []
        for q in ready_list:
            try:
                while True:
                    result_type, result_data = q.get_nowait()
                    results.append((result_type, result_data))
            except queue.Empty:
                pass
        
        return results
    
    def wait_for_completion(self, timeout=None):
        # Wait for all workers to finish
        completed_workers = waitforthreads(self.workers, timeout=timeout)
        return list(completed_workers)

# Test coordinated worker pool
def test_coordinated_pool():
    def sample_task(task_id, duration):
        time.sleep(duration)
        return f"Task {task_id} completed in {duration}s"
    
    pool = CoordinatedWorkerPool(worker_count=3)
    pool.start()
    
    try:
        # Submit tasks
        for i in range(10):
            pool.submit_task(sample_task, i, random.uniform(0.1, 1.0))
        
        # Monitor results as they come in
        total_results = 0
        while total_results < 10:
            results = pool.get_results(timeout=2.0)
            
            for result_type, result_data in results:
                if result_type == 'success':
                    print(f"Success: {result_data}")
                else:
                    print(f"Error: {result_data}")
                total_results += 1
            
            if not results:
                print("No results yet, waiting...")
        
        print("All tasks completed")
        
    finally:
        pool.stop()
        completed = pool.wait_for_completion(timeout=5.0)
        print(f"Pool stopped, {len(completed)} workers completed")

test_coordinated_pool()

Install with Tessl CLI

npx tessl i tessl/pypi-pebble

docs

asynchronous-decorators.md

concurrent-decorators.md

future-types-exceptions.md

index.md

process-pools.md

synchronization-utilities.md

thread-pools.md

tile.json