0
# Synchronization
1
2
Synchronization primitives including locks, semaphores, events, conditions, and barriers for coordinating processes and ensuring thread-safe access to shared resources.
3
4
## Capabilities
5
6
### Locks
7
8
Mutual exclusion primitives for protecting critical sections and shared resources.
9
10
```python { .api }
11
class Lock:
12
"""
13
A non-recursive lock (mutual exclusion lock).
14
"""
15
def acquire(self, block=True, timeout=None) -> bool:
16
"""
17
Acquire the lock.
18
19
Parameters:
20
- block: whether to block if lock is unavailable
21
- timeout: timeout in seconds (None for no timeout)
22
23
Returns:
24
True if lock acquired, False otherwise
25
"""
26
27
def release(self):
28
"""
29
Release the lock.
30
31
Raises:
32
- ValueError: if lock is not currently held
33
"""
34
35
def __enter__(self):
36
"""Context manager entry."""
37
return self.acquire()
38
39
def __exit__(self, exc_type, exc_val, exc_tb):
40
"""Context manager exit."""
41
self.release()
42
43
class RLock:
44
"""
45
A reentrant lock (recursive lock).
46
Can be acquired multiple times by the same process.
47
"""
48
def acquire(self, block=True, timeout=None) -> bool:
49
"""
50
Acquire the lock, incrementing recursion level.
51
52
Parameters:
53
- block: whether to block if lock is unavailable
54
- timeout: timeout in seconds (None for no timeout)
55
56
Returns:
57
True if lock acquired, False otherwise
58
"""
59
60
def release(self):
61
"""
62
Release the lock, decrementing recursion level.
63
64
Raises:
65
- ValueError: if lock is not currently held by calling process
66
"""
67
68
def __enter__(self):
69
"""Context manager entry."""
70
return self.acquire()
71
72
def __exit__(self, exc_type, exc_val, exc_tb):
73
"""Context manager exit."""
74
self.release()
75
```
76
77
Usage example:
78
79
```python
80
from billiard import Process, Lock, RLock
81
import time
82
83
# Shared resource counter
84
counter = 0
85
lock = Lock()
86
87
def worker_with_lock(name, iterations, shared_lock):
88
"""Worker that safely increments counter"""
89
global counter
90
91
for i in range(iterations):
92
# Critical section
93
with shared_lock:
94
old_value = counter
95
time.sleep(0.001) # Simulate some work
96
counter = old_value + 1
97
print(f"{name}: incremented counter to {counter}")
98
99
def recursive_function(rlock, level):
100
"""Function that acquires lock recursively"""
101
with rlock:
102
print(f"Level {level}: acquired lock")
103
if level > 0:
104
recursive_function(rlock, level - 1)
105
print(f"Level {level}: releasing lock")
106
107
if __name__ == '__main__':
108
# Test regular lock
109
processes = []
110
for i in range(3):
111
p = Process(target=worker_with_lock, args=(f"Worker-{i}", 5, lock))
112
processes.append(p)
113
p.start()
114
115
for p in processes:
116
p.join()
117
118
print(f"Final counter value: {counter}")
119
120
# Test recursive lock
121
rlock = RLock()
122
process = Process(target=recursive_function, args=(rlock, 3))
123
process.start()
124
process.join()
125
```
126
127
### Semaphores
128
129
Counting semaphores for controlling access to resources with limited capacity.
130
131
```python { .api }
132
class Semaphore:
133
"""
134
A counting semaphore.
135
"""
136
def __init__(self, value=1, ctx=None):
137
"""
138
Create a semaphore.
139
140
Parameters:
141
- value: initial count (default: 1)
142
- ctx: multiprocessing context
143
"""
144
145
def acquire(self, block=True, timeout=None) -> bool:
146
"""
147
Acquire the semaphore, decrementing internal counter.
148
149
Parameters:
150
- block: whether to block if semaphore unavailable
151
- timeout: timeout in seconds (None for no timeout)
152
153
Returns:
154
True if semaphore acquired, False otherwise
155
"""
156
157
def release(self):
158
"""
159
Release the semaphore, incrementing internal counter.
160
"""
161
162
def __enter__(self):
163
"""Context manager entry."""
164
return self.acquire()
165
166
def __exit__(self, exc_type, exc_val, exc_tb):
167
"""Context manager exit."""
168
self.release()
169
170
class BoundedSemaphore(Semaphore):
171
"""
172
A bounded semaphore that prevents release() from raising count above initial value.
173
"""
174
def release(self):
175
"""
176
Release the semaphore, but prevent count from exceeding initial value.
177
178
Raises:
179
- ValueError: if release() would increase count above initial value
180
"""
181
```
182
183
Usage example:
184
185
```python
186
from billiard import Process, Semaphore, BoundedSemaphore
187
import time
188
import random
189
190
# Semaphore limiting concurrent access to 3 resources
191
resource_semaphore = Semaphore(3)
192
193
def worker_with_semaphore(worker_id, semaphore):
194
"""Worker that uses limited resource"""
195
print(f"Worker {worker_id}: requesting resource...")
196
197
with semaphore:
198
print(f"Worker {worker_id}: acquired resource")
199
# Simulate work with resource
200
work_time = random.uniform(0.5, 2.0)
201
time.sleep(work_time)
202
print(f"Worker {worker_id}: releasing resource after {work_time:.1f}s")
203
204
print(f"Worker {worker_id}: done")
205
206
def bounded_semaphore_example():
207
"""Demonstrate bounded semaphore behavior"""
208
bounded_sem = BoundedSemaphore(2)
209
210
# Acquire twice (should work)
211
bounded_sem.acquire()
212
bounded_sem.acquire()
213
print("Acquired semaphore twice")
214
215
# Release twice
216
bounded_sem.release()
217
bounded_sem.release()
218
print("Released semaphore twice")
219
220
# Try to release again (should raise ValueError)
221
try:
222
bounded_sem.release()
223
except ValueError as e:
224
print(f"Bounded semaphore prevented over-release: {e}")
225
226
if __name__ == '__main__':
227
# Test resource limiting
228
workers = []
229
for i in range(8):
230
p = Process(target=worker_with_semaphore, args=(i, resource_semaphore))
231
workers.append(p)
232
p.start()
233
234
for p in workers:
235
p.join()
236
237
# Test bounded semaphore
238
bounded_semaphore_example()
239
```
240
241
### Events
242
243
Simple signaling mechanism for coordinating processes.
244
245
```python { .api }
246
class Event:
247
"""
248
A simple event object for process synchronization.
249
"""
250
def set(self):
251
"""
252
Set the internal flag to True.
253
All processes waiting for it become unblocked.
254
"""
255
256
def clear(self):
257
"""
258
Set the internal flag to False.
259
"""
260
261
def is_set(self) -> bool:
262
"""
263
Return True if internal flag is True.
264
265
Returns:
266
True if event is set, False otherwise
267
"""
268
269
def wait(self, timeout=None) -> bool:
270
"""
271
Block until internal flag is True.
272
273
Parameters:
274
- timeout: timeout in seconds (None for no timeout)
275
276
Returns:
277
True if event was set, False if timeout occurred
278
"""
279
```
280
281
Usage example:
282
283
```python
284
from billiard import Process, Event
285
import time
286
import random
287
288
def waiter(event, worker_id):
289
"""Process that waits for event"""
290
print(f"Waiter {worker_id}: waiting for event...")
291
292
if event.wait(timeout=5):
293
print(f"Waiter {worker_id}: event received!")
294
else:
295
print(f"Waiter {worker_id}: timeout waiting for event")
296
297
def setter(event, delay):
298
"""Process that sets event after delay"""
299
print(f"Setter: waiting {delay} seconds before setting event")
300
time.sleep(delay)
301
302
print("Setter: setting event")
303
event.set()
304
305
def event_coordination_example():
306
"""Demonstrate event coordination"""
307
event = Event()
308
309
# Start multiple waiters
310
waiters = []
311
for i in range(3):
312
p = Process(target=waiter, args=(event, i))
313
waiters.append(p)
314
p.start()
315
316
# Start setter with random delay
317
delay = random.uniform(1, 3)
318
setter_process = Process(target=setter, args=(event, delay))
319
setter_process.start()
320
321
# Wait for all processes
322
setter_process.join()
323
for p in waiters:
324
p.join()
325
326
print(f"Event is set: {event.is_set()}")
327
328
# Clear and test again
329
event.clear()
330
print(f"Event after clear: {event.is_set()}")
331
332
if __name__ == '__main__':
333
event_coordination_example()
334
```
335
336
### Conditions
337
338
Advanced synchronization allowing processes to wait for specific conditions.
339
340
```python { .api }
341
class Condition:
342
"""
343
A condition variable for process synchronization.
344
"""
345
def __init__(self, lock=None, ctx=None):
346
"""
347
Create a condition variable.
348
349
Parameters:
350
- lock: underlying lock (Lock() if None)
351
- ctx: multiprocessing context
352
"""
353
354
def acquire(self, block=True, timeout=None) -> bool:
355
"""
356
Acquire the underlying lock.
357
358
Parameters:
359
- block: whether to block if lock unavailable
360
- timeout: timeout in seconds (None for no timeout)
361
362
Returns:
363
True if lock acquired, False otherwise
364
"""
365
366
def release(self):
367
"""
368
Release the underlying lock.
369
"""
370
371
def wait(self, timeout=None) -> bool:
372
"""
373
Wait until notified or timeout.
374
Must be called with lock held.
375
376
Parameters:
377
- timeout: timeout in seconds (None for no timeout)
378
379
Returns:
380
True if notified, False if timeout
381
"""
382
383
def notify(self, n=1):
384
"""
385
Wake up at most n processes waiting on condition.
386
Must be called with lock held.
387
388
Parameters:
389
- n: number of processes to wake up
390
"""
391
392
def notify_all(self):
393
"""
394
Wake up all processes waiting on condition.
395
Must be called with lock held.
396
"""
397
398
def __enter__(self):
399
"""Context manager entry."""
400
return self.acquire()
401
402
def __exit__(self, exc_type, exc_val, exc_tb):
403
"""Context manager exit."""
404
self.release()
405
```
406
407
Usage example:
408
409
```python
410
from billiard import Process, Condition
411
import time
412
import random
413
414
# Shared state
415
items = []
416
condition = Condition()
417
418
def consumer(consumer_id, condition, items):
419
"""Consumer that waits for items"""
420
with condition:
421
while len(items) == 0:
422
print(f"Consumer {consumer_id}: waiting for items...")
423
condition.wait()
424
425
item = items.pop(0)
426
print(f"Consumer {consumer_id}: consumed {item}")
427
428
def producer(producer_id, condition, items):
429
"""Producer that creates items"""
430
for i in range(3):
431
item = f"item-{producer_id}-{i}"
432
433
time.sleep(random.uniform(0.5, 1.5))
434
435
with condition:
436
items.append(item)
437
print(f"Producer {producer_id}: produced {item}")
438
condition.notify() # Wake up one consumer
439
440
if __name__ == '__main__':
441
# Start consumers
442
consumers = []
443
for i in range(2):
444
p = Process(target=consumer, args=(i, condition, items))
445
consumers.append(p)
446
p.start()
447
448
# Start producers
449
producers = []
450
for i in range(2):
451
p = Process(target=producer, args=(i, condition, items))
452
producers.append(p)
453
p.start()
454
455
# Wait for completion
456
for p in producers:
457
p.join()
458
459
# Notify remaining consumers to check final state
460
with condition:
461
condition.notify_all()
462
463
for p in consumers:
464
p.join()
465
```
466
467
### Barriers
468
469
Synchronization point where processes wait for all participants to arrive.
470
471
```python { .api }
472
class Barrier:
473
"""
474
A barrier object for synchronizing processes.
475
"""
476
def __init__(self, parties, action=None, timeout=None, ctx=None):
477
"""
478
Create a barrier.
479
480
Parameters:
481
- parties: number of processes that must call wait()
482
- action: callable to invoke when barrier is released
483
- timeout: default timeout for wait()
484
- ctx: multiprocessing context
485
"""
486
487
def wait(self, timeout=None) -> int:
488
"""
489
Wait for all processes to reach barrier.
490
491
Parameters:
492
- timeout: timeout in seconds (None uses barrier default)
493
494
Returns:
495
Index in range(parties) identifying this process
496
497
Raises:
498
- BrokenBarrierError: if barrier is broken or reset while waiting
499
"""
500
501
def reset(self):
502
"""
503
Reset barrier to initial state.
504
Any processes waiting will receive BrokenBarrierError.
505
"""
506
507
def abort(self):
508
"""
509
Put barrier in broken state.
510
Any current or future calls to wait() will raise BrokenBarrierError.
511
"""
512
513
@property
514
def parties(self) -> int:
515
"""Number of processes required to trip barrier."""
516
517
@property
518
def n_waiting(self) -> int:
519
"""Number of processes currently waiting."""
520
521
@property
522
def broken(self) -> bool:
523
"""True if barrier is broken."""
524
```
525
526
Usage example:
527
528
```python
529
from billiard import Process, Barrier
530
import time
531
import random
532
533
def barrier_action():
534
"""Action to perform when all processes reach barrier"""
535
print("*** All processes synchronized - continuing! ***")
536
537
def worker_with_barrier(worker_id, barrier, phase_count):
538
"""Worker that synchronizes at barrier between phases"""
539
for phase in range(phase_count):
540
# Do some work
541
work_time = random.uniform(0.5, 2.0)
542
print(f"Worker {worker_id}: working on phase {phase} for {work_time:.1f}s")
543
time.sleep(work_time)
544
545
print(f"Worker {worker_id}: finished phase {phase}, waiting at barrier")
546
547
# Wait for all workers to complete this phase
548
try:
549
index = barrier.wait(timeout=5)
550
print(f"Worker {worker_id}: barrier passed (index {index})")
551
except Exception as e:
552
print(f"Worker {worker_id}: barrier error: {e}")
553
break
554
555
print(f"Worker {worker_id}: all phases complete")
556
557
if __name__ == '__main__':
558
num_workers = 4
559
num_phases = 3
560
561
# Create barrier for all workers
562
barrier = Barrier(num_workers, action=barrier_action)
563
564
print(f"Barrier created for {barrier.parties} processes")
565
566
# Start workers
567
workers = []
568
for i in range(num_workers):
569
p = Process(target=worker_with_barrier, args=(i, barrier, num_phases))
570
workers.append(p)
571
p.start()
572
573
# Monitor barrier state
574
for phase in range(num_phases):
575
time.sleep(1)
576
print(f"Phase {phase}: {barrier.n_waiting} processes waiting, "
577
f"broken: {barrier.broken}")
578
579
# Wait for all workers to complete
580
for p in workers:
581
p.join()
582
583
print("All workers completed")
584
```
585
586
## Synchronization Best Practices
587
588
1. **Always use context managers** (`with` statements) when possible to ensure locks are properly released
589
2. **Avoid deadlocks** by acquiring locks in consistent order across processes
590
3. **Use timeouts** for acquire() and wait() operations to prevent indefinite blocking
591
4. **Choose appropriate primitives**:
592
- **Lock/RLock**: Mutual exclusion of critical sections
593
- **Semaphore**: Rate limiting and resource counting
594
- **Event**: Simple signaling between processes
595
- **Condition**: Complex coordination with state changes
596
- **Barrier**: Synchronized phases in parallel algorithms