0
# Synchronization Primitives
1
2
Thread-like synchronization objects for coordinating processes. These primitives provide mutual exclusion, signaling, and coordination mechanisms that work across process boundaries.
3
4
## Capabilities
5
6
### Lock Objects
7
8
Basic mutual exclusion locks for protecting shared resources.
9
10
```python { .api }
11
def Lock():
12
"""
13
Create a non-recursive lock object.
14
15
Returns:
16
Lock: A lock object that can be acquired and released
17
"""
18
19
def RLock():
20
"""
21
Create a recursive lock object (reentrant lock).
22
23
Returns:
24
RLock: A lock that can be acquired multiple times by the same process
25
"""
26
```
27
28
#### Lock Methods
29
30
```python { .api }
31
class Lock:
32
def acquire(self, blocking=True, timeout=None):
33
"""
34
Acquire the lock.
35
36
Args:
37
blocking: if True, block until lock is available
38
timeout: maximum time to wait (seconds)
39
40
Returns:
41
bool: True if lock acquired, False if timeout occurred
42
"""
43
44
def release(self):
45
"""Release the lock."""
46
47
def __enter__(self):
48
"""Context manager entry - acquire lock."""
49
50
def __exit__(self, exc_type, exc_val, exc_tb):
51
"""Context manager exit - release lock."""
52
```
53
54
### Semaphore Objects
55
56
Counting semaphores for controlling access to resources with limited capacity.
57
58
```python { .api }
59
def Semaphore(value=1):
60
"""
61
Create a semaphore object.
62
63
Args:
64
value: initial value of the semaphore counter
65
66
Returns:
67
Semaphore: A semaphore with the specified initial value
68
"""
69
70
def BoundedSemaphore(value=1):
71
"""
72
Create a bounded semaphore object.
73
74
Args:
75
value: initial and maximum value of the semaphore counter
76
77
Returns:
78
BoundedSemaphore: A semaphore that cannot be released above initial value
79
"""
80
```
81
82
#### Semaphore Methods
83
84
```python { .api }
85
class Semaphore:
86
def acquire(self, blocking=True, timeout=None):
87
"""
88
Acquire the semaphore (decrement counter).
89
90
Args:
91
blocking: if True, block until semaphore is available
92
timeout: maximum time to wait (seconds)
93
94
Returns:
95
bool: True if acquired, False if timeout occurred
96
"""
97
98
def release(self):
99
"""Release the semaphore (increment counter)."""
100
101
def __enter__(self):
102
"""Context manager entry - acquire semaphore."""
103
104
def __exit__(self, exc_type, exc_val, exc_tb):
105
"""Context manager exit - release semaphore."""
106
```
107
108
### Event Objects
109
110
Simple signaling mechanism for process coordination.
111
112
```python { .api }
113
def Event():
114
"""
115
Create an event object.
116
117
Returns:
118
Event: An event that can be set and cleared
119
"""
120
```
121
122
#### Event Methods
123
124
```python { .api }
125
class Event:
126
def is_set(self):
127
"""
128
Return True if the event is set.
129
130
Returns:
131
bool: True if event is set, False otherwise
132
"""
133
134
def set(self):
135
"""Set the event flag to True."""
136
137
def clear(self):
138
"""Reset the event flag to False."""
139
140
def wait(self, timeout=None):
141
"""
142
Block until the event is set.
143
144
Args:
145
timeout: maximum time to wait (seconds)
146
147
Returns:
148
bool: True if event was set, False if timeout occurred
149
"""
150
```
151
152
### Condition Variables
153
154
Advanced synchronization for complex coordination scenarios.
155
156
```python { .api }
157
def Condition(lock=None):
158
"""
159
Create a condition variable.
160
161
Args:
162
lock: optional lock to use (creates RLock if None)
163
164
Returns:
165
Condition: A condition variable object
166
"""
167
```
168
169
#### Condition Methods
170
171
```python { .api }
172
class Condition:
173
def acquire(self, blocking=True, timeout=None):
174
"""Acquire the underlying lock."""
175
176
def release(self):
177
"""Release the underlying lock."""
178
179
def wait(self, timeout=None):
180
"""
181
Wait until notified or timeout occurs.
182
183
Args:
184
timeout: maximum time to wait (seconds)
185
186
Returns:
187
bool: True if notified, False if timeout occurred
188
"""
189
190
def wait_for(self, predicate, timeout=None):
191
"""
192
Wait until predicate becomes True.
193
194
Args:
195
predicate: callable that returns a boolean
196
timeout: maximum time to wait (seconds)
197
198
Returns:
199
bool: The value of predicate
200
"""
201
202
def notify(self, n=1):
203
"""
204
Wake up one or more processes waiting on this condition.
205
206
Args:
207
n: number of processes to wake up
208
"""
209
210
def notify_all(self):
211
"""Wake up all processes waiting on this condition."""
212
213
def __enter__(self):
214
"""Context manager entry - acquire underlying lock."""
215
216
def __exit__(self, exc_type, exc_val, exc_tb):
217
"""Context manager exit - release underlying lock."""
218
```
219
220
### Barrier Objects
221
222
Synchronization barrier for coordinating multiple processes at specific points.
223
224
```python { .api }
225
def Barrier(parties, action=None, timeout=None):
226
"""
227
Create a barrier for synchronizing processes.
228
229
Args:
230
parties: number of processes that must call wait() before all are released
231
action: optional callable to execute when barrier is released
232
timeout: default timeout for wait operations
233
234
Returns:
235
Barrier: A barrier object
236
"""
237
```
238
239
#### Barrier Methods
240
241
```python { .api }
242
class Barrier:
243
def wait(self, timeout=None):
244
"""
245
Wait at the barrier until all parties arrive.
246
247
Args:
248
timeout: maximum time to wait (seconds)
249
250
Returns:
251
int: index of this process (0 to parties-1)
252
253
Raises:
254
BrokenBarrierError: if barrier is broken
255
"""
256
257
def reset(self):
258
"""Reset the barrier to its initial state."""
259
260
def abort(self):
261
"""Put the barrier into a broken state."""
262
263
# Properties
264
parties: int # Number of processes required
265
n_waiting: int # Number of processes currently waiting
266
broken: bool # True if barrier is broken
267
```
268
269
## Usage Examples
270
271
### Basic Lock Usage
272
273
```python
274
from multiprocess import Process, Lock
275
import time
276
277
def worker(lock, worker_id):
278
with lock:
279
print(f"Worker {worker_id} acquired lock")
280
time.sleep(1) # Simulate work
281
print(f"Worker {worker_id} releasing lock")
282
283
# Shared lock
284
lock = Lock()
285
286
# Create processes
287
processes = []
288
for i in range(3):
289
p = Process(target=worker, args=(lock, i))
290
p.start()
291
processes.append(p)
292
293
for p in processes:
294
p.join()
295
```
296
297
### Semaphore for Resource Pool
298
299
```python
300
from multiprocess import Process, Semaphore
301
import time
302
303
def use_resource(semaphore, worker_id):
304
print(f"Worker {worker_id} waiting for resource")
305
with semaphore:
306
print(f"Worker {worker_id} acquired resource")
307
time.sleep(2) # Use resource
308
print(f"Worker {worker_id} released resource")
309
310
# Allow 2 concurrent resource users
311
semaphore = Semaphore(2)
312
313
# Create 5 processes competing for 2 resources
314
processes = []
315
for i in range(5):
316
p = Process(target=use_resource, args=(semaphore, i))
317
p.start()
318
processes.append(p)
319
320
for p in processes:
321
p.join()
322
```
323
324
### Event Signaling
325
326
```python
327
from multiprocess import Process, Event
328
import time
329
330
def waiter(event, name):
331
print(f"{name} waiting for event")
332
event.wait()
333
print(f"{name} received event")
334
335
def setter(event):
336
time.sleep(2)
337
print("Setting event")
338
event.set()
339
340
# Shared event
341
event = Event()
342
343
# Create waiting processes
344
waiters = []
345
for i in range(3):
346
p = Process(target=waiter, args=(event, f"Waiter-{i}"))
347
p.start()
348
waiters.append(p)
349
350
# Create setter process
351
setter_proc = Process(target=setter, args=(event,))
352
setter_proc.start()
353
354
# Wait for all
355
for p in waiters:
356
p.join()
357
setter_proc.join()
358
```
359
360
### Condition Variable Coordination
361
362
```python
363
from multiprocess import Process, Condition
364
import time
365
366
items = []
367
condition = Condition()
368
369
def consumer(condition, consumer_id):
370
with condition:
371
while len(items) == 0:
372
print(f"Consumer {consumer_id} waiting")
373
condition.wait()
374
item = items.pop()
375
print(f"Consumer {consumer_id} consumed {item}")
376
377
def producer(condition):
378
for i in range(5):
379
time.sleep(1)
380
with condition:
381
item = f"item-{i}"
382
items.append(item)
383
print(f"Produced {item}")
384
condition.notify()
385
386
# Create consumer processes
387
consumers = []
388
for i in range(2):
389
p = Process(target=consumer, args=(condition, i))
390
p.start()
391
consumers.append(p)
392
393
# Create producer process
394
prod = Process(target=producer, args=(condition,))
395
prod.start()
396
397
prod.join()
398
for p in consumers:
399
p.join()
400
```
401
402
### Barrier Synchronization
403
404
```python
405
from multiprocess import Process, Barrier
406
import time
407
import random
408
409
def worker(barrier, worker_id):
410
# Phase 1: Individual work
411
work_time = random.uniform(1, 3)
412
print(f"Worker {worker_id} working for {work_time:.1f} seconds")
413
time.sleep(work_time)
414
415
print(f"Worker {worker_id} finished phase 1, waiting at barrier")
416
try:
417
index = barrier.wait(timeout=10)
418
if index == 0: # First process to cross barrier
419
print("All workers completed phase 1!")
420
except Exception as e:
421
print(f"Worker {worker_id} barrier error: {e}")
422
return
423
424
# Phase 2: Synchronized work
425
print(f"Worker {worker_id} starting phase 2")
426
time.sleep(1)
427
print(f"Worker {worker_id} completed phase 2")
428
429
# Create barrier for 3 workers
430
barrier = Barrier(3)
431
432
# Create worker processes
433
processes = []
434
for i in range(3):
435
p = Process(target=worker, args=(barrier, i))
436
p.start()
437
processes.append(p)
438
439
for p in processes:
440
p.join()
441
```