0
# Synchronization
1
2
Synchronization primitives for coordinating multiple promises and waiting for all to complete. The barrier class provides a way to execute a callback only after a collection of promises have been fulfilled.
3
4
## Capabilities
5
6
### Barrier Creation
7
8
Create synchronization barriers for coordinating multiple promises.
9
10
```python { .api }
11
class barrier:
12
def __init__(self, promises=None, args=None, kwargs=None, callback=None, size=None):
13
"""
14
Create a synchronization barrier.
15
16
Parameters:
17
- promises: iterable, collection of promises to wait for
18
- args: tuple, arguments to pass to callback when all promises complete
19
- kwargs: dict, keyword arguments to pass to callback
20
- callback: callable or promise, function to execute when barrier completes
21
- size: int, expected number of promises (auto-calculated if promises provided)
22
"""
23
```
24
25
**Usage Examples:**
26
27
```python
28
from vine import barrier, promise
29
30
# Create barrier with existing promises
31
p1 = promise(lambda: "task1")
32
p2 = promise(lambda: "task2")
33
p3 = promise(lambda: "task3")
34
35
b = barrier([p1, p2, p3], callback=lambda: print("All tasks complete!"))
36
37
# Fulfill promises in any order
38
p2()
39
p1()
40
p3() # Prints: All tasks complete!
41
```
42
43
### Adding Promises
44
45
Add promises to the barrier before finalization.
46
47
```python { .api }
48
def add(self, promise):
49
"""
50
Add a promise to the barrier and increment expected size.
51
52
Parameters:
53
- promise: promise object to add to the barrier
54
55
Raises:
56
- ValueError: if barrier is already fulfilled
57
"""
58
59
def add_noincr(self, promise):
60
"""
61
Add promise to barrier without incrementing expected size.
62
63
Parameters:
64
- promise: promise object to add to the barrier
65
66
Raises:
67
- ValueError: if barrier is already fulfilled
68
"""
69
```
70
71
**Usage Examples:**
72
73
```python
74
# Start with empty barrier
75
b = barrier()
76
77
# Add promises dynamically
78
p1 = promise(fetch_data, args=("url1",))
79
p2 = promise(fetch_data, args=("url2",))
80
81
b.add(p1)
82
b.add(p2)
83
84
# Set completion callback
85
b.then(lambda: print("All data fetched!"))
86
87
# Add more promises before finalization
88
p3 = promise(fetch_data, args=("url3",))
89
b.add(p3)
90
91
b.finalize() # No more promises can be added
92
```
93
94
### Barrier Completion
95
96
Control when the barrier considers itself complete.
97
98
```python { .api }
99
def __call__(self, *args, **kwargs):
100
"""
101
Mark one promise as complete. Called automatically by fulfilled promises.
102
103
Internal method - promises call this when they complete.
104
"""
105
106
def finalize(self):
107
"""
108
Finalize the barrier, preventing addition of more promises.
109
110
If current promise count meets expected size, triggers completion immediately.
111
"""
112
```
113
114
**Usage Examples:**
115
116
```python
117
# Barrier with predetermined size
118
b = barrier(size=3)
119
b.then(lambda: print("3 promises completed!"))
120
121
# Add promises up to the size
122
b.add(promise1)
123
b.add(promise2)
124
b.add(promise3)
125
126
# Must finalize before promises can trigger completion
127
b.finalize()
128
129
# Now fulfill promises
130
promise1()
131
promise2()
132
promise3() # Triggers barrier completion
133
```
134
135
### Barrier Chaining
136
137
Chain callbacks to execute when the barrier completes.
138
139
```python { .api }
140
def then(self, callback, errback=None):
141
"""
142
Chain callback to execute when barrier completes.
143
144
Parameters:
145
- callback: callable or promise, function to execute on completion
146
- errback: callable or promise, error handler for exceptions
147
148
Returns:
149
None (delegates to internal promise)
150
"""
151
```
152
153
**Usage Examples:**
154
155
```python
156
# Chain multiple callbacks
157
b = barrier([p1, p2, p3])
158
159
b.then(lambda: print("First callback"))
160
b.then(lambda: print("Second callback"))
161
b.then(lambda: send_notification("All complete"))
162
163
# Chain with error handling
164
b.then(
165
success_callback,
166
errback=lambda exc: print(f"Barrier failed: {exc}")
167
)
168
```
169
170
### Error Handling
171
172
Handle exceptions that occur in barrier promises.
173
174
```python { .api }
175
def throw(self, *args, **kwargs):
176
"""
177
Throw exception through the barrier.
178
179
Parameters:
180
- *args: arguments to pass to underlying promise throw()
181
- **kwargs: keyword arguments to pass to underlying promise throw()
182
"""
183
184
# throw1 is an alias for throw
185
throw1 = throw
186
```
187
188
**Usage Examples:**
189
190
```python
191
# Handle errors in barrier promises
192
def error_handler(exc):
193
print(f"Barrier failed with: {exc}")
194
195
b = barrier([p1, p2, p3])
196
b.then(success_callback, errback=error_handler)
197
198
# If any promise fails, error handler is called
199
p1.throw(RuntimeError("Network error"))
200
```
201
202
### Barrier Cancellation
203
204
Cancel the barrier and stop waiting for promises.
205
206
```python { .api }
207
def cancel(self):
208
"""
209
Cancel the barrier and underlying promise.
210
211
Stops the barrier from completing even if all promises fulfill.
212
"""
213
```
214
215
**Usage Examples:**
216
217
```python
218
# Cancel barrier if taking too long
219
import threading
220
221
b = barrier([long_running_promise1, long_running_promise2])
222
b.then(lambda: print("Tasks completed"))
223
224
# Cancel after timeout
225
def timeout_cancel():
226
print("Barrier timed out, cancelling...")
227
b.cancel()
228
229
timer = threading.Timer(30.0, timeout_cancel)
230
timer.start()
231
```
232
233
### Barrier Properties
234
235
Access barrier state and information.
236
237
```python { .api }
238
@property
239
def ready(self) -> bool:
240
"""True when barrier has completed (all promises fulfilled)."""
241
242
@property
243
def finalized(self) -> bool:
244
"""True when barrier is finalized (no more promises can be added)."""
245
246
@property
247
def cancelled(self) -> bool:
248
"""True when barrier has been cancelled."""
249
250
@property
251
def size(self) -> int:
252
"""Expected number of promises for barrier completion."""
253
254
@property
255
def failed(self) -> bool:
256
"""True when barrier failed due to promise exception."""
257
258
@property
259
def reason(self) -> Exception:
260
"""Exception that caused barrier failure."""
261
```
262
263
**Usage Examples:**
264
265
```python
266
b = barrier([p1, p2, p3])
267
268
print(f"Waiting for {b.size} promises")
269
print(f"Finalized: {b.finalized}") # True
270
print(f"Ready: {b.ready}") # False
271
272
# Fulfill promises
273
p1()
274
p2()
275
276
print(f"Ready: {b.ready}") # False (still waiting for p3)
277
278
p3()
279
280
print(f"Ready: {b.ready}") # True
281
```
282
283
## Advanced Usage
284
285
### Dynamic Barrier Management
286
287
```python
288
# Barrier that grows dynamically
289
b = barrier()
290
291
def add_more_work():
292
if more_work_needed():
293
new_promise = promise(do_additional_work)
294
b.add(new_promise)
295
new_promise.then(add_more_work) # Check for even more work
296
297
# Only finalize when we know no more work is needed
298
if no_more_work():
299
b.finalize()
300
301
# Start the process
302
initial_promise = promise(initial_work)
303
b.add(initial_promise)
304
initial_promise.then(add_more_work)
305
initial_promise()
306
```
307
308
### Barrier with Results Collection
309
310
```python
311
# Collect results from all promises
312
results = []
313
314
def collect_result(result):
315
results.append(result)
316
317
def all_complete():
318
print(f"All results: {results}")
319
320
p1 = promise(lambda: "result1")
321
p2 = promise(lambda: "result2")
322
p3 = promise(lambda: "result3")
323
324
# Chain each promise to collect its result
325
p1.then(collect_result)
326
p2.then(collect_result)
327
p3.then(collect_result)
328
329
# Use barrier to know when all are complete
330
b = barrier([p1, p2, p3], callback=all_complete)
331
332
# Execute promises
333
p1()
334
p2()
335
p3() # Prints: All results: ['result1', 'result2', 'result3']
336
```