or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mdexceptions.mdhandlers.mdindex.mdrecipes.mdsecurity.mdtesting.md

recipes.mddocs/

0

# Distributed Recipes

1

2

High-level distributed coordination primitives built on top of Zookeeper. These recipes provide common distributed systems patterns like locks, leader elections, queues, barriers, and counters with reliable semantics and fault tolerance.

3

4

## Capabilities

5

6

### Distributed Locking

7

8

Mutual exclusion primitives for coordinating access to shared resources across distributed processes with support for both exclusive and shared locking patterns.

9

10

```python { .api }

11

class Lock:

12

def __init__(self, client, path, identifier=None, extra_lock_patterns=()):

13

"""

14

Create a distributed lock.

15

16

Parameters:

17

- client (KazooClient): Connected Kazoo client

18

- path (str): Lock path in Zookeeper

19

- identifier (str): Unique identifier for this lock holder

20

- extra_lock_patterns (tuple): Additional patterns for lock contender identification

21

"""

22

23

def acquire(self, blocking=True, timeout=None):

24

"""

25

Acquire the lock.

26

27

Parameters:

28

- blocking (bool): Block until lock is acquired

29

- timeout (float): Maximum time to wait for lock

30

31

Returns:

32

bool: True if lock acquired, False if timeout

33

34

Raises:

35

- LockTimeout: If timeout exceeded

36

"""

37

38

def release(self):

39

"""Release the lock."""

40

41

@property

42

def is_acquired(self):

43

"""True if lock is currently held."""

44

45

class WriteLock(Lock):

46

"""Exclusive write lock implementation."""

47

48

class ReadLock:

49

def __init__(self, client, path, identifier=None):

50

"""

51

Create a shared read lock.

52

53

Parameters:

54

- client (KazooClient): Connected Kazoo client

55

- path (str): Lock path in Zookeeper

56

- identifier (str): Unique identifier for this lock holder

57

"""

58

59

def acquire(self, blocking=True, timeout=None):

60

"""Acquire read lock (shared with other readers)."""

61

62

def release(self):

63

"""Release read lock."""

64

65

class Semaphore:

66

def __init__(self, client, path, max_leases, identifier=None):

67

"""

68

Create a distributed semaphore.

69

70

Parameters:

71

- client (KazooClient): Connected Kazoo client

72

- path (str): Semaphore path in Zookeeper

73

- max_leases (int): Maximum number of concurrent holders

74

- identifier (str): Unique identifier for this holder

75

"""

76

77

def acquire(self, blocking=True, timeout=None):

78

"""Acquire a semaphore lease."""

79

80

def release(self):

81

"""Release the semaphore lease."""

82

83

@property

84

def lease_holders(self):

85

"""List of current lease holders."""

86

87

@property

88

def max_leases(self):

89

"""Maximum number of leases available."""

90

```

91

92

### Leader Election

93

94

Leader election algorithms for distributed systems requiring a single coordinator process with automatic failover and leadership transfer.

95

96

```python { .api }

97

class Election:

98

def __init__(self, client, path, identifier=None):

99

"""

100

Create a leader election.

101

102

Parameters:

103

- client (KazooClient): Connected Kazoo client

104

- path (str): Election path in Zookeeper

105

- identifier (str): Unique identifier for this candidate

106

"""

107

108

def run(self, func, *args, **kwargs):

109

"""

110

Run for leadership and execute function when elected.

111

112

Parameters:

113

- func (callable): Function to execute as leader

114

- args: Arguments for leader function

115

- kwargs: Keyword arguments for leader function

116

117

Returns:

118

Result of leader function

119

"""

120

121

def cancel(self):

122

"""Cancel leadership candidacy."""

123

124

@property

125

def contenders(self):

126

"""List of all election contenders."""

127

```

128

129

### Distributed Queues

130

131

Queue implementations for distributed task processing and message passing with priority support and blocking/non-blocking operations.

132

133

```python { .api }

134

class Queue:

135

def __init__(self, client, path):

136

"""

137

Create a distributed FIFO queue.

138

139

Parameters:

140

- client (KazooClient): Connected Kazoo client

141

- path (str): Queue path in Zookeeper

142

"""

143

144

def put(self, value, priority=100):

145

"""

146

Add item to queue.

147

148

Parameters:

149

- value (bytes): Item data

150

- priority (int): Item priority (lower = higher priority)

151

152

Returns:

153

str: Item path in queue

154

"""

155

156

def get(self, timeout=None):

157

"""

158

Get item from queue.

159

160

Parameters:

161

- timeout (float): Maximum time to wait for item

162

163

Returns:

164

bytes: Item data, or None if queue is empty (basic Queue) or timeout exceeded (LockingQueue)

165

"""

166

167

def put_all(self, items, priority=100):

168

"""Add multiple items to queue."""

169

170

@property

171

def length(self):

172

"""Number of items in queue."""

173

174

class LockingQueue(Queue):

175

def __init__(self, client, path):

176

"""

177

Queue with built-in locking for thread safety.

178

179

Parameters:

180

- client (KazooClient): Connected Kazoo client

181

- path (str): Queue path in Zookeeper

182

"""

183

184

def consume(self):

185

"""

186

Consume items from queue with locking.

187

188

Yields:

189

bytes: Queue items

190

"""

191

```

192

193

### Barriers and Synchronization

194

195

Synchronization primitives for coordinating distributed processes at specific execution points with support for both simple and double barriers.

196

197

```python { .api }

198

class Barrier:

199

def __init__(self, client, path, num_clients):

200

"""

201

Create a distributed barrier.

202

203

Parameters:

204

- client (KazooClient): Connected Kazoo client

205

- path (str): Barrier path in Zookeeper

206

- num_clients (int): Number of clients required

207

"""

208

209

def create(self):

210

"""Create the barrier node."""

211

212

def wait(self, timeout=None):

213

"""

214

Wait for all clients to reach barrier.

215

216

Parameters:

217

- timeout (float): Maximum time to wait

218

219

Returns:

220

bool: True if barrier released, False if timeout

221

"""

222

223

def remove(self):

224

"""Remove the barrier."""

225

226

class DoubleBarrier:

227

def __init__(self, client, path, num_clients, identifier=None):

228

"""

229

Create a double barrier for entry/exit synchronization.

230

231

Parameters:

232

- client (KazooClient): Connected Kazoo client

233

- path (str): Barrier path in Zookeeper

234

- num_clients (int): Number of clients required

235

- identifier (str): Unique client identifier

236

"""

237

238

def enter(self, timeout=None):

239

"""

240

Enter the barrier (wait for all clients).

241

242

Parameters:

243

- timeout (float): Maximum time to wait

244

245

Returns:

246

bool: True if entered, False if timeout

247

"""

248

249

def leave(self, timeout=None):

250

"""

251

Leave the barrier (wait for all clients to leave).

252

253

Parameters:

254

- timeout (float): Maximum time to wait

255

256

Returns:

257

bool: True if left, False if timeout

258

"""

259

```

260

261

### Distributed Counters

262

263

Atomic counter implementation for distributed counting operations with increment, decrement, and value retrieval operations.

264

265

```python { .api }

266

class Counter:

267

def __init__(self, client, path, default=0):

268

"""

269

Create a distributed counter.

270

271

Parameters:

272

- client (KazooClient): Connected Kazoo client

273

- path (str): Counter path in Zookeeper

274

- default (int): Default counter value

275

"""

276

277

@property

278

def value(self):

279

"""Current counter value."""

280

281

def get(self):

282

"""

283

Get current counter value.

284

285

Returns:

286

int: Current counter value

287

"""

288

289

def increment(self, amount=1):

290

"""

291

Increment counter atomically.

292

293

Parameters:

294

- amount (int): Amount to increment

295

296

Returns:

297

int: New counter value

298

"""

299

300

def decrement(self, amount=1):

301

"""

302

Decrement counter atomically.

303

304

Parameters:

305

- amount (int): Amount to decrement

306

307

Returns:

308

int: New counter value

309

"""

310

311

def reset(self, value=0):

312

"""

313

Reset counter to specific value.

314

315

Parameters:

316

- value (int): New counter value

317

"""

318

```

319

320

### Group Membership

321

322

Party implementations for tracking group membership and coordinating distributed processes with support for both full and shallow membership tracking.

323

324

```python { .api }

325

class Party:

326

def __init__(self, client, path, identifier=None):

327

"""

328

Create a distributed party for membership tracking.

329

330

Parameters:

331

- client (KazooClient): Connected Kazoo client

332

- path (str): Party path in Zookeeper

333

- identifier (str): Unique member identifier

334

"""

335

336

def join(self):

337

"""Join the party."""

338

339

def leave(self):

340

"""Leave the party."""

341

342

@property

343

def is_member(self):

344

"""True if currently a party member."""

345

346

def get_members(self):

347

"""

348

Get all party members.

349

350

Returns:

351

list: List of member identifiers

352

"""

353

354

def wait_for_members(self, count, timeout=None):

355

"""

356

Wait for specific number of members.

357

358

Parameters:

359

- count (int): Required member count

360

- timeout (float): Maximum time to wait

361

362

Returns:

363

bool: True if count reached, False if timeout

364

"""

365

366

class ShallowParty(Party):

367

"""Party with reduced overhead for large groups."""

368

```

369

370

### Work Partitioning

371

372

Partitioning system for distributing work across multiple processes with automatic rebalancing and failure recovery.

373

374

```python { .api }

375

class SetPartitioner:

376

def __init__(self, client, path, set, partition_func=None, identifier=None,

377

time_boundary=30, state_change_event=None):

378

"""

379

Create a set partitioner for distributed work.

380

381

Parameters:

382

- client (KazooClient): Connected Kazoo client

383

- path (str): Partitioner path in Zookeeper

384

- set (iterable): Items to partition

385

- partition_func (callable): Custom partition function

386

- identifier (str): Unique partitioner identifier

387

- time_boundary (int): Time boundary for rebalancing

388

- state_change_event: Event triggered on partition changes

389

"""

390

391

def __iter__(self):

392

"""Iterate over assigned partitions."""

393

394

def allocate_set(self):

395

"""Allocate partitions among participants."""

396

397

def finish(self):

398

"""Finish partitioning and cleanup."""

399

400

@property

401

def state(self):

402

"""Current partitioner state."""

403

404

@property

405

def failed(self):

406

"""True if partitioner has failed."""

407

408

@property

409

def release(self):

410

"""True if partitioner should release partitions."""

411

412

@property

413

def acquired(self):

414

"""True if partitions are acquired."""

415

416

@property

417

def allocating(self):

418

"""True if allocation is in progress."""

419

420

class PartitionState:

421

"""State constants for partitioner."""

422

ALLOCATING: str

423

ACQUIRED: str

424

RELEASE: str

425

FAILURE: str

426

```

427

428

### Resource Leasing

429

430

Lease implementations for temporary resource allocation with timeout-based automatic release and non-blocking acquisition patterns.

431

432

```python { .api }

433

class NonBlockingLease:

434

def __init__(self, client, path, duration, identifier=None):

435

"""

436

Create a non-blocking lease.

437

438

Parameters:

439

- client (KazooClient): Connected Kazoo client

440

- path (str): Lease path in Zookeeper

441

- duration (int): Lease duration in seconds

442

- identifier (str): Unique lease identifier

443

"""

444

445

def __enter__(self):

446

"""Context manager entry."""

447

448

def __exit__(self, exc_type, exc_val, exc_tb):

449

"""Context manager exit."""

450

451

def acquire(self):

452

"""

453

Acquire the lease.

454

455

Returns:

456

bool: True if lease acquired, False otherwise

457

"""

458

459

def release(self):

460

"""Release the lease."""

461

462

@property

463

def is_acquired(self):

464

"""True if lease is currently held."""

465

466

class MultiNonBlockingLease:

467

def __init__(self, client, path, count, duration, identifier=None):

468

"""

469

Create multiple non-blocking leases.

470

471

Parameters:

472

- client (KazooClient): Connected Kazoo client

473

- path (str): Base lease path in Zookeeper

474

- count (int): Number of leases to create

475

- duration (int): Lease duration in seconds

476

- identifier (str): Unique lease identifier

477

"""

478

479

def __iter__(self):

480

"""Iterate over leases."""

481

482

def acquire(self):

483

"""

484

Acquire all leases.

485

486

Returns:

487

bool: True if all leases acquired

488

"""

489

490

def release(self):

491

"""Release all leases."""

492

```

493

494

## Usage Examples

495

496

### Distributed Lock Example

497

498

```python

499

from kazoo.client import KazooClient

500

from kazoo.recipe.lock import Lock

501

import time

502

503

zk = KazooClient()

504

zk.start()

505

506

# Create lock

507

lock = Lock(zk, "/myapp/critical_section", "worker-1")

508

509

try:

510

# Acquire lock with timeout

511

if lock.acquire(timeout=10):

512

print("Lock acquired, performing critical work...")

513

time.sleep(5) # Simulate work

514

print("Work completed")

515

else:

516

print("Could not acquire lock within timeout")

517

finally:

518

lock.release()

519

zk.stop()

520

```

521

522

### Leader Election Example

523

524

```python

525

from kazoo.client import KazooClient

526

from kazoo.recipe.election import Election

527

import threading

528

529

def leader_function():

530

print("I am the leader!")

531

# Leadership work here

532

time.sleep(30)

533

print("Leadership term completed")

534

return "success"

535

536

zk = KazooClient()

537

zk.start()

538

539

election = Election(zk, "/myapp/election", "candidate-1")

540

541

try:

542

# Run for leadership

543

result = election.run(leader_function)

544

print(f"Leadership result: {result}")

545

except KeyboardInterrupt:

546

election.cancel()

547

finally:

548

zk.stop()

549

```

550

551

### Distributed Queue Example

552

553

```python

554

from kazoo.client import KazooClient

555

from kazoo.recipe.queue import Queue

556

import json

557

558

zk = KazooClient()

559

zk.start()

560

561

queue = Queue(zk, "/myapp/tasks")

562

563

try:

564

# Producer: Add tasks to queue

565

task_data = json.dumps({"task": "process_data", "params": {"file": "data.csv"}})

566

queue.put(task_data.encode('utf-8'), priority=1)

567

568

# Consumer: Process tasks from queue

569

while True:

570

try:

571

task = queue.get(timeout=5.0)

572

task_obj = json.loads(task.decode('utf-8'))

573

print(f"Processing task: {task_obj}")

574

# Process task here

575

576

if task_obj is None:

577

print("No tasks available")

578

break

579

580

finally:

581

zk.stop()

582

```

583

584

### Barrier Synchronization Example

585

586

```python

587

from kazoo.client import KazooClient

588

from kazoo.recipe.barrier import DoubleBarrier

589

import threading

590

591

def worker(worker_id):

592

zk = KazooClient()

593

zk.start()

594

595

barrier = DoubleBarrier(zk, "/myapp/sync", 3, f"worker-{worker_id}")

596

597

try:

598

print(f"Worker {worker_id} starting...")

599

600

# Wait for all workers to start

601

barrier.enter(timeout=30)

602

print(f"Worker {worker_id} entered barrier, starting work...")

603

604

# Do work

605

time.sleep(2)

606

607

# Wait for all workers to finish

608

barrier.leave(timeout=30)

609

print(f"Worker {worker_id} completed")

610

611

finally:

612

zk.stop()

613

614

# Start multiple workers

615

threads = []

616

for i in range(3):

617

t = threading.Thread(target=worker, args=(i,))

618

threads.append(t)

619

t.start()

620

621

for t in threads:

622

t.join()

623

```