or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

abstract.mdindex.mdpromises.mdsynchronization.mdutilities.md

synchronization.mddocs/

0

# Synchronization

1

2

Synchronization primitives for coordinating multiple promises and waiting for all to complete. The barrier class provides a way to execute a callback only after a collection of promises have been fulfilled.

3

4

## Capabilities

5

6

### Barrier Creation

7

8

Create synchronization barriers for coordinating multiple promises.

9

10

```python { .api }

11

class barrier:

12

def __init__(self, promises=None, args=None, kwargs=None, callback=None, size=None):

13

"""

14

Create a synchronization barrier.

15

16

Parameters:

17

- promises: iterable, collection of promises to wait for

18

- args: tuple, arguments to pass to callback when all promises complete

19

- kwargs: dict, keyword arguments to pass to callback

20

- callback: callable or promise, function to execute when barrier completes

21

- size: int, expected number of promises (auto-calculated if promises provided)

22

"""

23

```

24

25

**Usage Examples:**

26

27

```python

28

from vine import barrier, promise

29

30

# Create barrier with existing promises

31

p1 = promise(lambda: "task1")

32

p2 = promise(lambda: "task2")

33

p3 = promise(lambda: "task3")

34

35

b = barrier([p1, p2, p3], callback=lambda: print("All tasks complete!"))

36

37

# Fulfill promises in any order

38

p2()

39

p1()

40

p3() # Prints: All tasks complete!

41

```

42

43

### Adding Promises

44

45

Add promises to the barrier before finalization.

46

47

```python { .api }

48

def add(self, promise):

49

"""

50

Add a promise to the barrier and increment expected size.

51

52

Parameters:

53

- promise: promise object to add to the barrier

54

55

Raises:

56

- ValueError: if barrier is already fulfilled

57

"""

58

59

def add_noincr(self, promise):

60

"""

61

Add promise to barrier without incrementing expected size.

62

63

Parameters:

64

- promise: promise object to add to the barrier

65

66

Raises:

67

- ValueError: if barrier is already fulfilled

68

"""

69

```

70

71

**Usage Examples:**

72

73

```python

74

# Start with empty barrier

75

b = barrier()

76

77

# Add promises dynamically

78

p1 = promise(fetch_data, args=("url1",))

79

p2 = promise(fetch_data, args=("url2",))

80

81

b.add(p1)

82

b.add(p2)

83

84

# Set completion callback

85

b.then(lambda: print("All data fetched!"))

86

87

# Add more promises before finalization

88

p3 = promise(fetch_data, args=("url3",))

89

b.add(p3)

90

91

b.finalize() # No more promises can be added

92

```

93

94

### Barrier Completion

95

96

Control when the barrier considers itself complete.

97

98

```python { .api }

99

def __call__(self, *args, **kwargs):

100

"""

101

Mark one promise as complete. Called automatically by fulfilled promises.

102

103

Internal method - promises call this when they complete.

104

"""

105

106

def finalize(self):

107

"""

108

Finalize the barrier, preventing addition of more promises.

109

110

If current promise count meets expected size, triggers completion immediately.

111

"""

112

```

113

114

**Usage Examples:**

115

116

```python

117

# Barrier with predetermined size

118

b = barrier(size=3)

119

b.then(lambda: print("3 promises completed!"))

120

121

# Add promises up to the size

122

b.add(promise1)

123

b.add(promise2)

124

b.add(promise3)

125

126

# Must finalize before promises can trigger completion

127

b.finalize()

128

129

# Now fulfill promises

130

promise1()

131

promise2()

132

promise3() # Triggers barrier completion

133

```

134

135

### Barrier Chaining

136

137

Chain callbacks to execute when the barrier completes.

138

139

```python { .api }

140

def then(self, callback, errback=None):

141

"""

142

Chain callback to execute when barrier completes.

143

144

Parameters:

145

- callback: callable or promise, function to execute on completion

146

- errback: callable or promise, error handler for exceptions

147

148

Returns:

149

None (delegates to internal promise)

150

"""

151

```

152

153

**Usage Examples:**

154

155

```python

156

# Chain multiple callbacks

157

b = barrier([p1, p2, p3])

158

159

b.then(lambda: print("First callback"))

160

b.then(lambda: print("Second callback"))

161

b.then(lambda: send_notification("All complete"))

162

163

# Chain with error handling

164

b.then(

165

success_callback,

166

errback=lambda exc: print(f"Barrier failed: {exc}")

167

)

168

```

169

170

### Error Handling

171

172

Handle exceptions that occur in barrier promises.

173

174

```python { .api }

175

def throw(self, *args, **kwargs):

176

"""

177

Throw exception through the barrier.

178

179

Parameters:

180

- *args: arguments to pass to underlying promise throw()

181

- **kwargs: keyword arguments to pass to underlying promise throw()

182

"""

183

184

# throw1 is an alias for throw

185

throw1 = throw

186

```

187

188

**Usage Examples:**

189

190

```python

191

# Handle errors in barrier promises

192

def error_handler(exc):

193

print(f"Barrier failed with: {exc}")

194

195

b = barrier([p1, p2, p3])

196

b.then(success_callback, errback=error_handler)

197

198

# If any promise fails, error handler is called

199

p1.throw(RuntimeError("Network error"))

200

```

201

202

### Barrier Cancellation

203

204

Cancel the barrier and stop waiting for promises.

205

206

```python { .api }

207

def cancel(self):

208

"""

209

Cancel the barrier and underlying promise.

210

211

Stops the barrier from completing even if all promises fulfill.

212

"""

213

```

214

215

**Usage Examples:**

216

217

```python

218

# Cancel barrier if taking too long

219

import threading

220

221

b = barrier([long_running_promise1, long_running_promise2])

222

b.then(lambda: print("Tasks completed"))

223

224

# Cancel after timeout

225

def timeout_cancel():

226

print("Barrier timed out, cancelling...")

227

b.cancel()

228

229

timer = threading.Timer(30.0, timeout_cancel)

230

timer.start()

231

```

232

233

### Barrier Properties

234

235

Access barrier state and information.

236

237

```python { .api }

238

@property

239

def ready(self) -> bool:

240

"""True when barrier has completed (all promises fulfilled)."""

241

242

@property

243

def finalized(self) -> bool:

244

"""True when barrier is finalized (no more promises can be added)."""

245

246

@property

247

def cancelled(self) -> bool:

248

"""True when barrier has been cancelled."""

249

250

@property

251

def size(self) -> int:

252

"""Expected number of promises for barrier completion."""

253

254

@property

255

def failed(self) -> bool:

256

"""True when barrier failed due to promise exception."""

257

258

@property

259

def reason(self) -> Exception:

260

"""Exception that caused barrier failure."""

261

```

262

263

**Usage Examples:**

264

265

```python

266

b = barrier([p1, p2, p3])

267

268

print(f"Waiting for {b.size} promises")

269

print(f"Finalized: {b.finalized}") # True

270

print(f"Ready: {b.ready}") # False

271

272

# Fulfill promises

273

p1()

274

p2()

275

276

print(f"Ready: {b.ready}") # False (still waiting for p3)

277

278

p3()

279

280

print(f"Ready: {b.ready}") # True

281

```

282

283

## Advanced Usage

284

285

### Dynamic Barrier Management

286

287

```python

288

# Barrier that grows dynamically

289

b = barrier()

290

291

def add_more_work():

292

if more_work_needed():

293

new_promise = promise(do_additional_work)

294

b.add(new_promise)

295

new_promise.then(add_more_work) # Check for even more work

296

297

# Only finalize when we know no more work is needed

298

if no_more_work():

299

b.finalize()

300

301

# Start the process

302

initial_promise = promise(initial_work)

303

b.add(initial_promise)

304

initial_promise.then(add_more_work)

305

initial_promise()

306

```

307

308

### Barrier with Results Collection

309

310

```python

311

# Collect results from all promises

312

results = []

313

314

def collect_result(result):

315

results.append(result)

316

317

def all_complete():

318

print(f"All results: {results}")

319

320

p1 = promise(lambda: "result1")

321

p2 = promise(lambda: "result2")

322

p3 = promise(lambda: "result3")

323

324

# Chain each promise to collect its result

325

p1.then(collect_result)

326

p2.then(collect_result)

327

p3.then(collect_result)

328

329

# Use barrier to know when all are complete

330

b = barrier([p1, p2, p3], callback=all_complete)

331

332

# Execute promises

333

p1()

334

p2()

335

p3() # Prints: All results: ['result1', 'result2', 'result3']

336

```