or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-config.mdindex.mdpools.mdprocess-management.mdshared-objects.mdsynchronization.md

synchronization.mddocs/

0

# Synchronization Primitives

1

2

Thread-like synchronization objects for coordinating processes. These primitives provide mutual exclusion, signaling, and coordination mechanisms that work across process boundaries.

3

4

## Capabilities

5

6

### Lock Objects

7

8

Basic mutual exclusion locks for protecting shared resources.

9

10

```python { .api }

11

def Lock():

12

"""

13

Create a non-recursive lock object.

14

15

Returns:

16

Lock: A lock object that can be acquired and released

17

"""

18

19

def RLock():

20

"""

21

Create a recursive lock object (reentrant lock).

22

23

Returns:

24

RLock: A lock that can be acquired multiple times by the same process

25

"""

26

```

27

28

#### Lock Methods

29

30

```python { .api }

31

class Lock:

32

def acquire(self, blocking=True, timeout=None):

33

"""

34

Acquire the lock.

35

36

Args:

37

blocking: if True, block until lock is available

38

timeout: maximum time to wait (seconds)

39

40

Returns:

41

bool: True if lock acquired, False if timeout occurred

42

"""

43

44

def release(self):

45

"""Release the lock."""

46

47

def __enter__(self):

48

"""Context manager entry - acquire lock."""

49

50

def __exit__(self, exc_type, exc_val, exc_tb):

51

"""Context manager exit - release lock."""

52

```

53

54

### Semaphore Objects

55

56

Counting semaphores for controlling access to resources with limited capacity.

57

58

```python { .api }

59

def Semaphore(value=1):

60

"""

61

Create a semaphore object.

62

63

Args:

64

value: initial value of the semaphore counter

65

66

Returns:

67

Semaphore: A semaphore with the specified initial value

68

"""

69

70

def BoundedSemaphore(value=1):

71

"""

72

Create a bounded semaphore object.

73

74

Args:

75

value: initial and maximum value of the semaphore counter

76

77

Returns:

78

BoundedSemaphore: A semaphore that cannot be released above initial value

79

"""

80

```

81

82

#### Semaphore Methods

83

84

```python { .api }

85

class Semaphore:

86

def acquire(self, blocking=True, timeout=None):

87

"""

88

Acquire the semaphore (decrement counter).

89

90

Args:

91

blocking: if True, block until semaphore is available

92

timeout: maximum time to wait (seconds)

93

94

Returns:

95

bool: True if acquired, False if timeout occurred

96

"""

97

98

def release(self):

99

"""Release the semaphore (increment counter)."""

100

101

def __enter__(self):

102

"""Context manager entry - acquire semaphore."""

103

104

def __exit__(self, exc_type, exc_val, exc_tb):

105

"""Context manager exit - release semaphore."""

106

```

107

108

### Event Objects

109

110

Simple signaling mechanism for process coordination.

111

112

```python { .api }

113

def Event():

114

"""

115

Create an event object.

116

117

Returns:

118

Event: An event that can be set and cleared

119

"""

120

```

121

122

#### Event Methods

123

124

```python { .api }

125

class Event:

126

def is_set(self):

127

"""

128

Return True if the event is set.

129

130

Returns:

131

bool: True if event is set, False otherwise

132

"""

133

134

def set(self):

135

"""Set the event flag to True."""

136

137

def clear(self):

138

"""Reset the event flag to False."""

139

140

def wait(self, timeout=None):

141

"""

142

Block until the event is set.

143

144

Args:

145

timeout: maximum time to wait (seconds)

146

147

Returns:

148

bool: True if event was set, False if timeout occurred

149

"""

150

```

151

152

### Condition Variables

153

154

Advanced synchronization for complex coordination scenarios.

155

156

```python { .api }

157

def Condition(lock=None):

158

"""

159

Create a condition variable.

160

161

Args:

162

lock: optional lock to use (creates RLock if None)

163

164

Returns:

165

Condition: A condition variable object

166

"""

167

```

168

169

#### Condition Methods

170

171

```python { .api }

172

class Condition:

173

def acquire(self, blocking=True, timeout=None):

174

"""Acquire the underlying lock."""

175

176

def release(self):

177

"""Release the underlying lock."""

178

179

def wait(self, timeout=None):

180

"""

181

Wait until notified or timeout occurs.

182

183

Args:

184

timeout: maximum time to wait (seconds)

185

186

Returns:

187

bool: True if notified, False if timeout occurred

188

"""

189

190

def wait_for(self, predicate, timeout=None):

191

"""

192

Wait until predicate becomes True.

193

194

Args:

195

predicate: callable that returns a boolean

196

timeout: maximum time to wait (seconds)

197

198

Returns:

199

bool: The value of predicate

200

"""

201

202

def notify(self, n=1):

203

"""

204

Wake up one or more processes waiting on this condition.

205

206

Args:

207

n: number of processes to wake up

208

"""

209

210

def notify_all(self):

211

"""Wake up all processes waiting on this condition."""

212

213

def __enter__(self):

214

"""Context manager entry - acquire underlying lock."""

215

216

def __exit__(self, exc_type, exc_val, exc_tb):

217

"""Context manager exit - release underlying lock."""

218

```

219

220

### Barrier Objects

221

222

Synchronization barrier for coordinating multiple processes at specific points.

223

224

```python { .api }

225

def Barrier(parties, action=None, timeout=None):

226

"""

227

Create a barrier for synchronizing processes.

228

229

Args:

230

parties: number of processes that must call wait() before all are released

231

action: optional callable to execute when barrier is released

232

timeout: default timeout for wait operations

233

234

Returns:

235

Barrier: A barrier object

236

"""

237

```

238

239

#### Barrier Methods

240

241

```python { .api }

242

class Barrier:

243

def wait(self, timeout=None):

244

"""

245

Wait at the barrier until all parties arrive.

246

247

Args:

248

timeout: maximum time to wait (seconds)

249

250

Returns:

251

int: index of this process (0 to parties-1)

252

253

Raises:

254

BrokenBarrierError: if barrier is broken

255

"""

256

257

def reset(self):

258

"""Reset the barrier to its initial state."""

259

260

def abort(self):

261

"""Put the barrier into a broken state."""

262

263

# Properties

264

parties: int # Number of processes required

265

n_waiting: int # Number of processes currently waiting

266

broken: bool # True if barrier is broken

267

```

268

269

## Usage Examples

270

271

### Basic Lock Usage

272

273

```python

274

from multiprocess import Process, Lock

275

import time

276

277

def worker(lock, worker_id):

278

with lock:

279

print(f"Worker {worker_id} acquired lock")

280

time.sleep(1) # Simulate work

281

print(f"Worker {worker_id} releasing lock")

282

283

# Shared lock

284

lock = Lock()

285

286

# Create processes

287

processes = []

288

for i in range(3):

289

p = Process(target=worker, args=(lock, i))

290

p.start()

291

processes.append(p)

292

293

for p in processes:

294

p.join()

295

```

296

297

### Semaphore for Resource Pool

298

299

```python

300

from multiprocess import Process, Semaphore

301

import time

302

303

def use_resource(semaphore, worker_id):

304

print(f"Worker {worker_id} waiting for resource")

305

with semaphore:

306

print(f"Worker {worker_id} acquired resource")

307

time.sleep(2) # Use resource

308

print(f"Worker {worker_id} released resource")

309

310

# Allow 2 concurrent resource users

311

semaphore = Semaphore(2)

312

313

# Create 5 processes competing for 2 resources

314

processes = []

315

for i in range(5):

316

p = Process(target=use_resource, args=(semaphore, i))

317

p.start()

318

processes.append(p)

319

320

for p in processes:

321

p.join()

322

```

323

324

### Event Signaling

325

326

```python

327

from multiprocess import Process, Event

328

import time

329

330

def waiter(event, name):

331

print(f"{name} waiting for event")

332

event.wait()

333

print(f"{name} received event")

334

335

def setter(event):

336

time.sleep(2)

337

print("Setting event")

338

event.set()

339

340

# Shared event

341

event = Event()

342

343

# Create waiting processes

344

waiters = []

345

for i in range(3):

346

p = Process(target=waiter, args=(event, f"Waiter-{i}"))

347

p.start()

348

waiters.append(p)

349

350

# Create setter process

351

setter_proc = Process(target=setter, args=(event,))

352

setter_proc.start()

353

354

# Wait for all

355

for p in waiters:

356

p.join()

357

setter_proc.join()

358

```

359

360

### Condition Variable Coordination

361

362

```python

363

from multiprocess import Process, Condition

364

import time

365

366

items = []

367

condition = Condition()

368

369

def consumer(condition, consumer_id):

370

with condition:

371

while len(items) == 0:

372

print(f"Consumer {consumer_id} waiting")

373

condition.wait()

374

item = items.pop()

375

print(f"Consumer {consumer_id} consumed {item}")

376

377

def producer(condition):

378

for i in range(5):

379

time.sleep(1)

380

with condition:

381

item = f"item-{i}"

382

items.append(item)

383

print(f"Produced {item}")

384

condition.notify()

385

386

# Create consumer processes

387

consumers = []

388

for i in range(2):

389

p = Process(target=consumer, args=(condition, i))

390

p.start()

391

consumers.append(p)

392

393

# Create producer process

394

prod = Process(target=producer, args=(condition,))

395

prod.start()

396

397

prod.join()

398

for p in consumers:

399

p.join()

400

```

401

402

### Barrier Synchronization

403

404

```python

405

from multiprocess import Process, Barrier

406

import time

407

import random

408

409

def worker(barrier, worker_id):

410

# Phase 1: Individual work

411

work_time = random.uniform(1, 3)

412

print(f"Worker {worker_id} working for {work_time:.1f} seconds")

413

time.sleep(work_time)

414

415

print(f"Worker {worker_id} finished phase 1, waiting at barrier")

416

try:

417

index = barrier.wait(timeout=10)

418

if index == 0: # First process to cross barrier

419

print("All workers completed phase 1!")

420

except Exception as e:

421

print(f"Worker {worker_id} barrier error: {e}")

422

return

423

424

# Phase 2: Synchronized work

425

print(f"Worker {worker_id} starting phase 2")

426

time.sleep(1)

427

print(f"Worker {worker_id} completed phase 2")

428

429

# Create barrier for 3 workers

430

barrier = Barrier(3)

431

432

# Create worker processes

433

processes = []

434

for i in range(3):

435

p = Process(target=worker, args=(barrier, i))

436

p.start()

437

processes.append(p)

438

439

for p in processes:

440

p.join()

441

```