0
# Synchronization Utilities
1
2
Utility functions and decorators for thread synchronization, signal handling, and waiting operations. These tools help coordinate concurrent execution, handle edge cases, and provide fine-grained control over synchronization in multi-threaded and multi-process environments.
3
4
## Capabilities
5
6
### Synchronized Decorator
7
8
A decorator that prevents concurrent execution of decorated functions using locks, preventing race conditions and ensuring thread-safe access to shared resources.
9
10
```python { .api }
11
def synchronized(lock=None):
12
"""
13
Decorator that synchronizes function execution using a lock.
14
15
Parameters:
16
- lock: Optional Lock, RLock, or Semaphore object for synchronization.
17
If None, uses a shared Lock for all @synchronized functions.
18
19
Returns:
20
Decorated function that executes atomically
21
"""
22
```
23
24
#### Basic Synchronization
25
26
```python
27
from pebble import synchronized
28
import threading
29
import time
30
31
# Shared resource
32
counter = 0
33
shared_data = []
34
35
# Using default shared lock
36
@synchronized
37
def increment_counter():
38
global counter
39
current = counter
40
time.sleep(0.001) # Simulate some work
41
counter = current + 1
42
43
@synchronized
44
def append_data(value):
45
shared_data.append(value)
46
time.sleep(0.001)
47
48
# Using custom lock
49
custom_lock = threading.RLock()
50
51
@synchronized(custom_lock)
52
def complex_operation(value):
53
shared_data.append(f"start-{value}")
54
time.sleep(0.01)
55
shared_data.append(f"end-{value}")
56
57
# Test synchronization
58
def test_synchronization():
59
threads = []
60
61
# Test counter increment
62
for i in range(100):
63
thread = threading.Thread(target=increment_counter)
64
threads.append(thread)
65
66
# Test data appending
67
for i in range(50):
68
thread = threading.Thread(target=append_data, args=(i,))
69
threads.append(thread)
70
71
# Test complex operation
72
for i in range(10):
73
thread = threading.Thread(target=complex_operation, args=(i,))
74
threads.append(thread)
75
76
# Start all threads
77
for thread in threads:
78
thread.start()
79
80
# Wait for completion
81
for thread in threads:
82
thread.join()
83
84
print(f"Final counter value: {counter}")
85
print(f"Shared data length: {len(shared_data)}")
86
print(f"Complex operations: {[item for item in shared_data if 'start' in item]}")
87
88
test_synchronization()
89
```
90
91
#### Advanced Synchronization Patterns
92
93
```python
94
from pebble import synchronized
95
import threading
96
import time
97
import queue
98
99
# Different types of locks for different use cases
100
read_write_lock = threading.RLock() # Allows recursive locking
101
resource_semaphore = threading.Semaphore(3) # Limit concurrent access
102
condition_lock = threading.Condition()
103
104
class ThreadSafeCounter:
105
def __init__(self):
106
self._value = 0
107
self._lock = threading.Lock()
108
109
@synchronized # Uses shared lock
110
def get_global_count(self):
111
# This method shares a lock with all other @synchronized methods
112
return self._value
113
114
@synchronized(lambda self: self._lock) # Uses instance lock
115
def increment(self):
116
self._value += 1
117
118
@synchronized(lambda self: self._lock)
119
def decrement(self):
120
self._value -= 1
121
122
@synchronized(lambda self: self._lock)
123
def get_value(self):
124
return self._value
125
126
# Resource pool with semaphore
127
@synchronized(resource_semaphore)
128
def use_limited_resource(resource_id, duration):
129
print(f"Using resource {resource_id}")
130
time.sleep(duration)
131
print(f"Released resource {resource_id}")
132
return f"Result from resource {resource_id}"
133
134
# Producer-consumer with condition
135
shared_queue = queue.Queue()
136
137
@synchronized(condition_lock)
138
def producer(items):
139
for item in items:
140
shared_queue.put(item)
141
print(f"Produced: {item}")
142
condition_lock.notify_all() # Wake up consumers
143
time.sleep(0.1)
144
145
@synchronized(condition_lock)
146
def consumer(consumer_id, timeout=5):
147
while True:
148
try:
149
item = shared_queue.get(timeout=timeout)
150
print(f"Consumer {consumer_id} consumed: {item}")
151
shared_queue.task_done()
152
except queue.Empty:
153
print(f"Consumer {consumer_id} timed out")
154
break
155
156
# Test advanced patterns
157
def test_advanced_patterns():
158
# Test thread-safe counter
159
counter = ThreadSafeCounter()
160
161
def worker():
162
for _ in range(100):
163
counter.increment()
164
if counter.get_value() > 50:
165
counter.decrement()
166
167
threads = [threading.Thread(target=worker) for _ in range(5)]
168
for t in threads:
169
t.start()
170
for t in threads:
171
t.join()
172
173
print(f"Final counter value: {counter.get_value()}")
174
175
# Test resource pool
176
resource_threads = []
177
for i in range(10):
178
thread = threading.Thread(
179
target=use_limited_resource,
180
args=(i, 1.0)
181
)
182
resource_threads.append(thread)
183
184
for t in resource_threads:
185
t.start()
186
for t in resource_threads:
187
t.join()
188
189
# Test producer-consumer
190
producer_thread = threading.Thread(
191
target=producer,
192
args=(list(range(20)),)
193
)
194
consumer_threads = [
195
threading.Thread(target=consumer, args=(i,))
196
for i in range(3)
197
]
198
199
producer_thread.start()
200
for t in consumer_threads:
201
t.start()
202
203
producer_thread.join()
204
for t in consumer_threads:
205
t.join()
206
207
test_advanced_patterns()
208
```
209
210
### Signal Handler Decorator
211
212
A decorator for setting up signal handlers to manage process lifecycle and handle system signals gracefully.
213
214
```python { .api }
215
def sighandler(signals):
216
"""
217
Decorator that sets the decorated function as a signal handler.
218
219
Parameters:
220
- signals: Single signal or list/tuple of signals to handle
221
222
Returns:
223
Decorated function that will be called when specified signals are received
224
"""
225
```
226
227
#### Signal Handling Examples
228
229
```python
230
from pebble import sighandler
231
import signal
232
import time
233
import sys
234
import os
235
236
# Global state for signal handling
237
shutdown_requested = False
238
received_signals = []
239
240
# Handle single signal
241
@sighandler(signal.SIGINT)
242
def handle_interrupt(signum, frame):
243
global shutdown_requested
244
print(f"\nReceived SIGINT (Ctrl+C). Initiating graceful shutdown...")
245
shutdown_requested = True
246
247
# Handle multiple signals
248
@sighandler([signal.SIGTERM, signal.SIGUSR1])
249
def handle_multiple_signals(signum, frame):
250
global received_signals
251
signal_names = {
252
signal.SIGTERM: "SIGTERM",
253
signal.SIGUSR1: "SIGUSR1"
254
}
255
256
signal_name = signal_names.get(signum, f"Signal {signum}")
257
print(f"Received {signal_name}")
258
received_signals.append((signum, time.time()))
259
260
if signum == signal.SIGTERM:
261
print("Termination requested")
262
sys.exit(0)
263
elif signum == signal.SIGUSR1:
264
print("User signal 1 - logging status")
265
print(f"Received signals so far: {len(received_signals)}")
266
267
# Complex signal handler with state management
268
class SignalAwareWorker:
269
def __init__(self):
270
self.running = True
271
self.tasks_completed = 0
272
self.setup_signal_handlers()
273
274
@sighandler(signal.SIGINT)
275
def handle_shutdown(self, signum, frame):
276
print(f"\nShutdown signal received. Completed {self.tasks_completed} tasks.")
277
self.running = False
278
279
@sighandler(signal.SIGUSR2)
280
def handle_status_request(self, signum, frame):
281
print(f"Status: Running={self.running}, Tasks completed={self.tasks_completed}")
282
283
def setup_signal_handlers(self):
284
# Signal handlers are already set up by decorators
285
pass
286
287
def work(self):
288
print("Worker started. Send SIGINT to stop, SIGUSR2 for status.")
289
290
while self.running:
291
# Simulate work
292
time.sleep(1)
293
self.tasks_completed += 1
294
295
if self.tasks_completed % 5 == 0:
296
print(f"Completed {self.tasks_completed} tasks...")
297
298
print("Worker shutting down gracefully.")
299
300
# Signal handling in multiprocessing context
301
def worker_process():
302
"""Worker process with signal handling"""
303
304
@sighandler(signal.SIGTERM)
305
def worker_shutdown(signum, frame):
306
print(f"Worker process {os.getpid()} received SIGTERM")
307
sys.exit(0)
308
309
@sighandler(signal.SIGUSR1)
310
def worker_status(signum, frame):
311
print(f"Worker process {os.getpid()} is alive")
312
313
print(f"Worker process {os.getpid()} started")
314
315
# Simulate work
316
for i in range(100):
317
time.sleep(0.5)
318
if i % 10 == 0:
319
print(f"Worker {os.getpid()} progress: {i}/100")
320
321
# Example usage
322
def signal_handling_demo():
323
print("Signal handling demo. Process PID:", os.getpid())
324
print("Try: kill -SIGUSR1", os.getpid())
325
print("Or: kill -SIGTERM", os.getpid())
326
327
worker = SignalAwareWorker()
328
329
try:
330
worker.work()
331
except KeyboardInterrupt:
332
print("Caught KeyboardInterrupt in main")
333
334
# Uncomment to run demo (be careful in production environments)
335
# signal_handling_demo()
336
```
337
338
### Thread Waiting Functions
339
340
Functions for waiting on multiple threads to complete, providing fine-grained control over thread synchronization.
341
342
```python { .api }
343
def waitforthreads(threads, timeout=None):
344
"""
345
Wait for one or more threads to complete.
346
347
Parameters:
348
- threads: List of threading.Thread objects to wait for
349
- timeout: Maximum time to wait in seconds (None for no timeout)
350
351
Returns:
352
Filter object containing threads that have completed
353
"""
354
```
355
356
#### Thread Waiting Examples
357
358
```python
359
from pebble import waitforthreads
360
import threading
361
import time
362
import random
363
364
def worker_task(task_id, duration):
365
print(f"Task {task_id} starting (duration: {duration}s)")
366
time.sleep(duration)
367
print(f"Task {task_id} completed")
368
return f"Result {task_id}"
369
370
def test_thread_waiting():
371
# Create multiple threads with different durations
372
threads = []
373
for i in range(5):
374
duration = random.uniform(1, 5)
375
thread = threading.Thread(
376
target=worker_task,
377
args=(i, duration)
378
)
379
threads.append(thread)
380
381
# Start all threads
382
for thread in threads:
383
thread.start()
384
385
print("All threads started. Waiting for completion...")
386
387
# Wait for threads with timeout
388
completed = waitforthreads(threads, timeout=3.0)
389
completed_list = list(completed)
390
391
print(f"After 3 seconds, {len(completed_list)} threads completed")
392
393
# Wait for remaining threads
394
remaining = [t for t in threads if t.is_alive()]
395
if remaining:
396
print(f"Waiting for {len(remaining)} remaining threads...")
397
final_completed = waitforthreads(remaining, timeout=10.0)
398
final_completed_list = list(final_completed)
399
print(f"Finally, {len(final_completed_list)} more threads completed")
400
401
# Clean up any remaining threads
402
for thread in threads:
403
if thread.is_alive():
404
print(f"Thread {thread.name} is still running")
405
thread.join(timeout=1.0)
406
407
# Advanced thread waiting patterns
408
def advanced_thread_waiting():
409
# Producer threads
410
producer_threads = []
411
shared_queue = []
412
413
def producer(producer_id, item_count):
414
for i in range(item_count):
415
item = f"item-{producer_id}-{i}"
416
shared_queue.append(item)
417
time.sleep(0.1)
418
print(f"Producer {producer_id} finished")
419
420
# Consumer threads
421
consumer_threads = []
422
423
def consumer(consumer_id):
424
while True:
425
if shared_queue:
426
item = shared_queue.pop(0)
427
print(f"Consumer {consumer_id} processed {item}")
428
time.sleep(0.05)
429
else:
430
time.sleep(0.01)
431
432
# Start producers
433
for i in range(3):
434
thread = threading.Thread(target=producer, args=(i, 10))
435
producer_threads.append(thread)
436
thread.start()
437
438
# Start consumers
439
for i in range(2):
440
thread = threading.Thread(target=consumer, args=(i))
441
thread.daemon = True # Will exit when main program exits
442
consumer_threads.append(thread)
443
thread.start()
444
445
# Wait for producers to finish
446
print("Waiting for producers to finish...")
447
completed_producers = waitforthreads(producer_threads, timeout=20.0)
448
completed_count = len(list(completed_producers))
449
450
print(f"{completed_count} producers completed")
451
452
# Give consumers time to process remaining items
453
time.sleep(2)
454
455
print(f"Final queue size: {len(shared_queue)}")
456
457
test_thread_waiting()
458
advanced_thread_waiting()
459
```
460
461
### Queue Waiting Functions
462
463
Functions for waiting on multiple queues to have data available, enabling efficient queue-based coordination.
464
465
```python { .api }
466
def waitforqueues(queues, timeout=None):
467
"""
468
Wait for one or more queues to have data available.
469
470
Parameters:
471
- queues: List of queue.Queue objects to monitor
472
- timeout: Maximum time to wait in seconds (None for no timeout)
473
474
Returns:
475
Filter object containing queues that have data available
476
"""
477
```
478
479
#### Queue Waiting Examples
480
481
```python
482
from pebble import waitforqueues
483
import queue
484
import threading
485
import time
486
import random
487
488
def test_queue_waiting():
489
# Create multiple queues
490
queues = [queue.Queue() for _ in range(4)]
491
queue_names = [f"Queue-{i}" for i in range(4)]
492
493
def producer(q, name, delay):
494
time.sleep(delay)
495
q.put(f"Data from {name}")
496
print(f"{name} produced data after {delay}s")
497
498
# Start producers with different delays
499
producers = []
500
for i, (q, name) in enumerate(zip(queues, queue_names)):
501
delay = random.uniform(1, 5)
502
producer_thread = threading.Thread(
503
target=producer,
504
args=(q, name, delay)
505
)
506
producers.append(producer_thread)
507
producer_thread.start()
508
509
print("Waiting for queues to have data...")
510
511
# Wait for queues to have data
512
ready_queues = waitforqueues(queues, timeout=3.0)
513
ready_list = list(ready_queues)
514
515
print(f"After 3 seconds, {len(ready_list)} queues have data")
516
517
# Process ready queues
518
for q in ready_list:
519
try:
520
data = q.get_nowait()
521
print(f"Got data: {data}")
522
except queue.Empty:
523
print("Queue became empty")
524
525
# Wait for remaining queues
526
remaining = [q for q in queues if q.empty()]
527
if remaining:
528
print(f"Waiting for {len(remaining)} more queues...")
529
final_ready = waitforqueues(remaining, timeout=10.0)
530
final_ready_list = list(final_ready)
531
532
for q in final_ready_list:
533
try:
534
data = q.get_nowait()
535
print(f"Got final data: {data}")
536
except queue.Empty:
537
print("Final queue became empty")
538
539
# Clean up
540
for thread in producers:
541
thread.join()
542
543
# Advanced queue coordination
544
def queue_coordination_example():
545
# Different types of queues
546
priority_queue = queue.PriorityQueue()
547
lifo_queue = queue.LifoQueue() # Stack
548
fifo_queue = queue.Queue() # Regular queue
549
550
queues = [priority_queue, lifo_queue, fifo_queue]
551
queue_types = ["Priority", "LIFO", "FIFO"]
552
553
def priority_producer():
554
for priority in [3, 1, 2]: # Lower numbers = higher priority
555
time.sleep(random.uniform(0.5, 1.5))
556
priority_queue.put((priority, f"Priority-{priority} item"))
557
print(f"Added priority {priority} item")
558
559
def lifo_producer():
560
for i in range(3):
561
time.sleep(random.uniform(0.5, 1.5))
562
lifo_queue.put(f"LIFO-item-{i}")
563
print(f"Added LIFO item {i}")
564
565
def fifo_producer():
566
for i in range(3):
567
time.sleep(random.uniform(0.5, 1.5))
568
fifo_queue.put(f"FIFO-item-{i}")
569
print(f"Added FIFO item {i}")
570
571
# Start producers
572
producers = [
573
threading.Thread(target=priority_producer),
574
threading.Thread(target=lifo_producer),
575
threading.Thread(target=fifo_producer)
576
]
577
578
for producer in producers:
579
producer.start()
580
581
# Monitor queues as they get data
582
processed_items = 0
583
target_items = 9 # 3 items per queue
584
585
while processed_items < target_items:
586
print("Checking for queue updates...")
587
588
ready_queues = waitforqueues(queues, timeout=2.0)
589
ready_list = list(ready_queues)
590
591
if ready_list:
592
print(f"Found {len(ready_list)} queues with data")
593
594
for i, q in enumerate(queues):
595
if q in ready_list:
596
queue_type = queue_types[i]
597
try:
598
if isinstance(q, queue.PriorityQueue):
599
priority, item = q.get_nowait()
600
print(f" {queue_type}: {item} (priority {priority})")
601
else:
602
item = q.get_nowait()
603
print(f" {queue_type}: {item}")
604
processed_items += 1
605
except queue.Empty:
606
pass
607
else:
608
print("No queues ready, continuing to wait...")
609
610
# Wait for producers to finish
611
for producer in producers:
612
producer.join()
613
614
print(f"Processed all {processed_items} items")
615
616
test_queue_waiting()
617
queue_coordination_example()
618
```
619
620
### Utility Combinations
621
622
Combining synchronization utilities for complex coordination patterns:
623
624
```python
625
from pebble import synchronized, waitforthreads, waitforqueues
626
import threading
627
import queue
628
import time
629
630
class CoordinatedWorkerPool:
631
def __init__(self, worker_count=3):
632
self.worker_count = worker_count
633
self.task_queue = queue.Queue()
634
self.result_queues = [queue.Queue() for _ in range(worker_count)]
635
self.workers = []
636
self.active = False
637
self._lock = threading.Lock()
638
639
@synchronized
640
def start(self):
641
if self.active:
642
return
643
644
self.active = True
645
646
for i in range(self.worker_count):
647
worker = threading.Thread(
648
target=self._worker_loop,
649
args=(i, self.result_queues[i])
650
)
651
worker.daemon = True
652
self.workers.append(worker)
653
worker.start()
654
655
@synchronized
656
def stop(self):
657
self.active = False
658
659
# Add stop signals to task queue
660
for _ in range(self.worker_count):
661
self.task_queue.put(None)
662
663
def submit_task(self, task_func, *args, **kwargs):
664
if not self.active:
665
raise RuntimeError("Pool not started")
666
667
task = (task_func, args, kwargs)
668
self.task_queue.put(task)
669
670
def _worker_loop(self, worker_id, result_queue):
671
print(f"Worker {worker_id} started")
672
673
while self.active:
674
try:
675
task = self.task_queue.get(timeout=1.0)
676
677
if task is None: # Stop signal
678
break
679
680
task_func, args, kwargs = task
681
682
try:
683
result = task_func(*args, **kwargs)
684
result_queue.put(('success', result))
685
except Exception as e:
686
result_queue.put(('error', str(e)))
687
688
self.task_queue.task_done()
689
690
except queue.Empty:
691
continue
692
693
print(f"Worker {worker_id} stopped")
694
695
def get_results(self, timeout=None):
696
# Wait for result queues to have data
697
ready_queues = waitforqueues(self.result_queues, timeout=timeout)
698
ready_list = list(ready_queues)
699
700
results = []
701
for q in ready_list:
702
try:
703
while True:
704
result_type, result_data = q.get_nowait()
705
results.append((result_type, result_data))
706
except queue.Empty:
707
pass
708
709
return results
710
711
def wait_for_completion(self, timeout=None):
712
# Wait for all workers to finish
713
completed_workers = waitforthreads(self.workers, timeout=timeout)
714
return list(completed_workers)
715
716
# Test coordinated worker pool
717
def test_coordinated_pool():
718
def sample_task(task_id, duration):
719
time.sleep(duration)
720
return f"Task {task_id} completed in {duration}s"
721
722
pool = CoordinatedWorkerPool(worker_count=3)
723
pool.start()
724
725
try:
726
# Submit tasks
727
for i in range(10):
728
pool.submit_task(sample_task, i, random.uniform(0.1, 1.0))
729
730
# Monitor results as they come in
731
total_results = 0
732
while total_results < 10:
733
results = pool.get_results(timeout=2.0)
734
735
for result_type, result_data in results:
736
if result_type == 'success':
737
print(f"Success: {result_data}")
738
else:
739
print(f"Error: {result_data}")
740
total_results += 1
741
742
if not results:
743
print("No results yet, waiting...")
744
745
print("All tasks completed")
746
747
finally:
748
pool.stop()
749
completed = pool.wait_for_completion(timeout=5.0)
750
print(f"Pool stopped, {len(completed)} workers completed")
751
752
test_coordinated_pool()
753
```