or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-management.mdindex.mdmanagers.mdprocess-management.mdprocess-pools.mdqueues.mdshared-memory.mdsynchronization.md

synchronization.mddocs/

0

# Synchronization

1

2

Synchronization primitives including locks, semaphores, events, conditions, and barriers for coordinating processes and ensuring thread-safe access to shared resources.

3

4

## Capabilities

5

6

### Locks

7

8

Mutual exclusion primitives for protecting critical sections and shared resources.

9

10

```python { .api }

11

class Lock:

12

"""

13

A non-recursive lock (mutual exclusion lock).

14

"""

15

def acquire(self, block=True, timeout=None) -> bool:

16

"""

17

Acquire the lock.

18

19

Parameters:

20

- block: whether to block if lock is unavailable

21

- timeout: timeout in seconds (None for no timeout)

22

23

Returns:

24

True if lock acquired, False otherwise

25

"""

26

27

def release(self):

28

"""

29

Release the lock.

30

31

Raises:

32

- ValueError: if lock is not currently held

33

"""

34

35

def __enter__(self):

36

"""Context manager entry."""

37

return self.acquire()

38

39

def __exit__(self, exc_type, exc_val, exc_tb):

40

"""Context manager exit."""

41

self.release()

42

43

class RLock:

44

"""

45

A reentrant lock (recursive lock).

46

Can be acquired multiple times by the same process.

47

"""

48

def acquire(self, block=True, timeout=None) -> bool:

49

"""

50

Acquire the lock, incrementing recursion level.

51

52

Parameters:

53

- block: whether to block if lock is unavailable

54

- timeout: timeout in seconds (None for no timeout)

55

56

Returns:

57

True if lock acquired, False otherwise

58

"""

59

60

def release(self):

61

"""

62

Release the lock, decrementing recursion level.

63

64

Raises:

65

- ValueError: if lock is not currently held by calling process

66

"""

67

68

def __enter__(self):

69

"""Context manager entry."""

70

return self.acquire()

71

72

def __exit__(self, exc_type, exc_val, exc_tb):

73

"""Context manager exit."""

74

self.release()

75

```

76

77

Usage example:

78

79

```python

80

from billiard import Process, Lock, RLock

81

import time

82

83

# Shared resource counter

84

counter = 0

85

lock = Lock()

86

87

def worker_with_lock(name, iterations, shared_lock):

88

"""Worker that safely increments counter"""

89

global counter

90

91

for i in range(iterations):

92

# Critical section

93

with shared_lock:

94

old_value = counter

95

time.sleep(0.001) # Simulate some work

96

counter = old_value + 1

97

print(f"{name}: incremented counter to {counter}")

98

99

def recursive_function(rlock, level):

100

"""Function that acquires lock recursively"""

101

with rlock:

102

print(f"Level {level}: acquired lock")

103

if level > 0:

104

recursive_function(rlock, level - 1)

105

print(f"Level {level}: releasing lock")

106

107

if __name__ == '__main__':

108

# Test regular lock

109

processes = []

110

for i in range(3):

111

p = Process(target=worker_with_lock, args=(f"Worker-{i}", 5, lock))

112

processes.append(p)

113

p.start()

114

115

for p in processes:

116

p.join()

117

118

print(f"Final counter value: {counter}")

119

120

# Test recursive lock

121

rlock = RLock()

122

process = Process(target=recursive_function, args=(rlock, 3))

123

process.start()

124

process.join()

125

```

126

127

### Semaphores

128

129

Counting semaphores for controlling access to resources with limited capacity.

130

131

```python { .api }

132

class Semaphore:

133

"""

134

A counting semaphore.

135

"""

136

def __init__(self, value=1, ctx=None):

137

"""

138

Create a semaphore.

139

140

Parameters:

141

- value: initial count (default: 1)

142

- ctx: multiprocessing context

143

"""

144

145

def acquire(self, block=True, timeout=None) -> bool:

146

"""

147

Acquire the semaphore, decrementing internal counter.

148

149

Parameters:

150

- block: whether to block if semaphore unavailable

151

- timeout: timeout in seconds (None for no timeout)

152

153

Returns:

154

True if semaphore acquired, False otherwise

155

"""

156

157

def release(self):

158

"""

159

Release the semaphore, incrementing internal counter.

160

"""

161

162

def __enter__(self):

163

"""Context manager entry."""

164

return self.acquire()

165

166

def __exit__(self, exc_type, exc_val, exc_tb):

167

"""Context manager exit."""

168

self.release()

169

170

class BoundedSemaphore(Semaphore):

171

"""

172

A bounded semaphore that prevents release() from raising count above initial value.

173

"""

174

def release(self):

175

"""

176

Release the semaphore, but prevent count from exceeding initial value.

177

178

Raises:

179

- ValueError: if release() would increase count above initial value

180

"""

181

```

182

183

Usage example:

184

185

```python

186

from billiard import Process, Semaphore, BoundedSemaphore

187

import time

188

import random

189

190

# Semaphore limiting concurrent access to 3 resources

191

resource_semaphore = Semaphore(3)

192

193

def worker_with_semaphore(worker_id, semaphore):

194

"""Worker that uses limited resource"""

195

print(f"Worker {worker_id}: requesting resource...")

196

197

with semaphore:

198

print(f"Worker {worker_id}: acquired resource")

199

# Simulate work with resource

200

work_time = random.uniform(0.5, 2.0)

201

time.sleep(work_time)

202

print(f"Worker {worker_id}: releasing resource after {work_time:.1f}s")

203

204

print(f"Worker {worker_id}: done")

205

206

def bounded_semaphore_example():

207

"""Demonstrate bounded semaphore behavior"""

208

bounded_sem = BoundedSemaphore(2)

209

210

# Acquire twice (should work)

211

bounded_sem.acquire()

212

bounded_sem.acquire()

213

print("Acquired semaphore twice")

214

215

# Release twice

216

bounded_sem.release()

217

bounded_sem.release()

218

print("Released semaphore twice")

219

220

# Try to release again (should raise ValueError)

221

try:

222

bounded_sem.release()

223

except ValueError as e:

224

print(f"Bounded semaphore prevented over-release: {e}")

225

226

if __name__ == '__main__':

227

# Test resource limiting

228

workers = []

229

for i in range(8):

230

p = Process(target=worker_with_semaphore, args=(i, resource_semaphore))

231

workers.append(p)

232

p.start()

233

234

for p in workers:

235

p.join()

236

237

# Test bounded semaphore

238

bounded_semaphore_example()

239

```

240

241

### Events

242

243

Simple signaling mechanism for coordinating processes.

244

245

```python { .api }

246

class Event:

247

"""

248

A simple event object for process synchronization.

249

"""

250

def set(self):

251

"""

252

Set the internal flag to True.

253

All processes waiting for it become unblocked.

254

"""

255

256

def clear(self):

257

"""

258

Set the internal flag to False.

259

"""

260

261

def is_set(self) -> bool:

262

"""

263

Return True if internal flag is True.

264

265

Returns:

266

True if event is set, False otherwise

267

"""

268

269

def wait(self, timeout=None) -> bool:

270

"""

271

Block until internal flag is True.

272

273

Parameters:

274

- timeout: timeout in seconds (None for no timeout)

275

276

Returns:

277

True if event was set, False if timeout occurred

278

"""

279

```

280

281

Usage example:

282

283

```python

284

from billiard import Process, Event

285

import time

286

import random

287

288

def waiter(event, worker_id):

289

"""Process that waits for event"""

290

print(f"Waiter {worker_id}: waiting for event...")

291

292

if event.wait(timeout=5):

293

print(f"Waiter {worker_id}: event received!")

294

else:

295

print(f"Waiter {worker_id}: timeout waiting for event")

296

297

def setter(event, delay):

298

"""Process that sets event after delay"""

299

print(f"Setter: waiting {delay} seconds before setting event")

300

time.sleep(delay)

301

302

print("Setter: setting event")

303

event.set()

304

305

def event_coordination_example():

306

"""Demonstrate event coordination"""

307

event = Event()

308

309

# Start multiple waiters

310

waiters = []

311

for i in range(3):

312

p = Process(target=waiter, args=(event, i))

313

waiters.append(p)

314

p.start()

315

316

# Start setter with random delay

317

delay = random.uniform(1, 3)

318

setter_process = Process(target=setter, args=(event, delay))

319

setter_process.start()

320

321

# Wait for all processes

322

setter_process.join()

323

for p in waiters:

324

p.join()

325

326

print(f"Event is set: {event.is_set()}")

327

328

# Clear and test again

329

event.clear()

330

print(f"Event after clear: {event.is_set()}")

331

332

if __name__ == '__main__':

333

event_coordination_example()

334

```

335

336

### Conditions

337

338

Advanced synchronization allowing processes to wait for specific conditions.

339

340

```python { .api }

341

class Condition:

342

"""

343

A condition variable for process synchronization.

344

"""

345

def __init__(self, lock=None, ctx=None):

346

"""

347

Create a condition variable.

348

349

Parameters:

350

- lock: underlying lock (Lock() if None)

351

- ctx: multiprocessing context

352

"""

353

354

def acquire(self, block=True, timeout=None) -> bool:

355

"""

356

Acquire the underlying lock.

357

358

Parameters:

359

- block: whether to block if lock unavailable

360

- timeout: timeout in seconds (None for no timeout)

361

362

Returns:

363

True if lock acquired, False otherwise

364

"""

365

366

def release(self):

367

"""

368

Release the underlying lock.

369

"""

370

371

def wait(self, timeout=None) -> bool:

372

"""

373

Wait until notified or timeout.

374

Must be called with lock held.

375

376

Parameters:

377

- timeout: timeout in seconds (None for no timeout)

378

379

Returns:

380

True if notified, False if timeout

381

"""

382

383

def notify(self, n=1):

384

"""

385

Wake up at most n processes waiting on condition.

386

Must be called with lock held.

387

388

Parameters:

389

- n: number of processes to wake up

390

"""

391

392

def notify_all(self):

393

"""

394

Wake up all processes waiting on condition.

395

Must be called with lock held.

396

"""

397

398

def __enter__(self):

399

"""Context manager entry."""

400

return self.acquire()

401

402

def __exit__(self, exc_type, exc_val, exc_tb):

403

"""Context manager exit."""

404

self.release()

405

```

406

407

Usage example:

408

409

```python

410

from billiard import Process, Condition

411

import time

412

import random

413

414

# Shared state

415

items = []

416

condition = Condition()

417

418

def consumer(consumer_id, condition, items):

419

"""Consumer that waits for items"""

420

with condition:

421

while len(items) == 0:

422

print(f"Consumer {consumer_id}: waiting for items...")

423

condition.wait()

424

425

item = items.pop(0)

426

print(f"Consumer {consumer_id}: consumed {item}")

427

428

def producer(producer_id, condition, items):

429

"""Producer that creates items"""

430

for i in range(3):

431

item = f"item-{producer_id}-{i}"

432

433

time.sleep(random.uniform(0.5, 1.5))

434

435

with condition:

436

items.append(item)

437

print(f"Producer {producer_id}: produced {item}")

438

condition.notify() # Wake up one consumer

439

440

if __name__ == '__main__':

441

# Start consumers

442

consumers = []

443

for i in range(2):

444

p = Process(target=consumer, args=(i, condition, items))

445

consumers.append(p)

446

p.start()

447

448

# Start producers

449

producers = []

450

for i in range(2):

451

p = Process(target=producer, args=(i, condition, items))

452

producers.append(p)

453

p.start()

454

455

# Wait for completion

456

for p in producers:

457

p.join()

458

459

# Notify remaining consumers to check final state

460

with condition:

461

condition.notify_all()

462

463

for p in consumers:

464

p.join()

465

```

466

467

### Barriers

468

469

Synchronization point where processes wait for all participants to arrive.

470

471

```python { .api }

472

class Barrier:

473

"""

474

A barrier object for synchronizing processes.

475

"""

476

def __init__(self, parties, action=None, timeout=None, ctx=None):

477

"""

478

Create a barrier.

479

480

Parameters:

481

- parties: number of processes that must call wait()

482

- action: callable to invoke when barrier is released

483

- timeout: default timeout for wait()

484

- ctx: multiprocessing context

485

"""

486

487

def wait(self, timeout=None) -> int:

488

"""

489

Wait for all processes to reach barrier.

490

491

Parameters:

492

- timeout: timeout in seconds (None uses barrier default)

493

494

Returns:

495

Index in range(parties) identifying this process

496

497

Raises:

498

- BrokenBarrierError: if barrier is broken or reset while waiting

499

"""

500

501

def reset(self):

502

"""

503

Reset barrier to initial state.

504

Any processes waiting will receive BrokenBarrierError.

505

"""

506

507

def abort(self):

508

"""

509

Put barrier in broken state.

510

Any current or future calls to wait() will raise BrokenBarrierError.

511

"""

512

513

@property

514

def parties(self) -> int:

515

"""Number of processes required to trip barrier."""

516

517

@property

518

def n_waiting(self) -> int:

519

"""Number of processes currently waiting."""

520

521

@property

522

def broken(self) -> bool:

523

"""True if barrier is broken."""

524

```

525

526

Usage example:

527

528

```python

529

from billiard import Process, Barrier

530

import time

531

import random

532

533

def barrier_action():

534

"""Action to perform when all processes reach barrier"""

535

print("*** All processes synchronized - continuing! ***")

536

537

def worker_with_barrier(worker_id, barrier, phase_count):

538

"""Worker that synchronizes at barrier between phases"""

539

for phase in range(phase_count):

540

# Do some work

541

work_time = random.uniform(0.5, 2.0)

542

print(f"Worker {worker_id}: working on phase {phase} for {work_time:.1f}s")

543

time.sleep(work_time)

544

545

print(f"Worker {worker_id}: finished phase {phase}, waiting at barrier")

546

547

# Wait for all workers to complete this phase

548

try:

549

index = barrier.wait(timeout=5)

550

print(f"Worker {worker_id}: barrier passed (index {index})")

551

except Exception as e:

552

print(f"Worker {worker_id}: barrier error: {e}")

553

break

554

555

print(f"Worker {worker_id}: all phases complete")

556

557

if __name__ == '__main__':

558

num_workers = 4

559

num_phases = 3

560

561

# Create barrier for all workers

562

barrier = Barrier(num_workers, action=barrier_action)

563

564

print(f"Barrier created for {barrier.parties} processes")

565

566

# Start workers

567

workers = []

568

for i in range(num_workers):

569

p = Process(target=worker_with_barrier, args=(i, barrier, num_phases))

570

workers.append(p)

571

p.start()

572

573

# Monitor barrier state

574

for phase in range(num_phases):

575

time.sleep(1)

576

print(f"Phase {phase}: {barrier.n_waiting} processes waiting, "

577

f"broken: {barrier.broken}")

578

579

# Wait for all workers to complete

580

for p in workers:

581

p.join()

582

583

print("All workers completed")

584

```

585

586

## Synchronization Best Practices

587

588

1. **Always use context managers** (`with` statements) when possible to ensure locks are properly released

589

2. **Avoid deadlocks** by acquiring locks in consistent order across processes

590

3. **Use timeouts** for acquire() and wait() operations to prevent indefinite blocking

591

4. **Choose appropriate primitives**:

592

- **Lock/RLock**: Mutual exclusion of critical sections

593

- **Semaphore**: Rate limiting and resource counting

594

- **Event**: Simple signaling between processes

595

- **Condition**: Complex coordination with state changes

596

- **Barrier**: Synchronized phases in parallel algorithms