0
# Distributed Recipes
1
2
High-level distributed coordination primitives built on top of Zookeeper. These recipes provide common distributed systems patterns like locks, leader elections, queues, barriers, and counters with reliable semantics and fault tolerance.
3
4
## Capabilities
5
6
### Distributed Locking
7
8
Mutual exclusion primitives for coordinating access to shared resources across distributed processes with support for both exclusive and shared locking patterns.
9
10
```python { .api }
11
class Lock:
12
def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
13
"""
14
Create a distributed lock.
15
16
Parameters:
17
- client (KazooClient): Connected Kazoo client
18
- path (str): Lock path in Zookeeper
19
- identifier (str): Unique identifier for this lock holder
20
- extra_lock_patterns (tuple): Additional patterns for lock contender identification
21
"""
22
23
def acquire(self, blocking=True, timeout=None):
24
"""
25
Acquire the lock.
26
27
Parameters:
28
- blocking (bool): Block until lock is acquired
29
- timeout (float): Maximum time to wait for lock
30
31
Returns:
32
bool: True if lock acquired, False if timeout
33
34
Raises:
35
- LockTimeout: If timeout exceeded
36
"""
37
38
def release(self):
39
"""Release the lock."""
40
41
@property
42
def is_acquired(self):
43
"""True if lock is currently held."""
44
45
class WriteLock(Lock):
46
"""Exclusive write lock implementation."""
47
48
class ReadLock:
49
def __init__(self, client, path, identifier=None):
50
"""
51
Create a shared read lock.
52
53
Parameters:
54
- client (KazooClient): Connected Kazoo client
55
- path (str): Lock path in Zookeeper
56
- identifier (str): Unique identifier for this lock holder
57
"""
58
59
def acquire(self, blocking=True, timeout=None):
60
"""Acquire read lock (shared with other readers)."""
61
62
def release(self):
63
"""Release read lock."""
64
65
class Semaphore:
66
def __init__(self, client, path, max_leases, identifier=None):
67
"""
68
Create a distributed semaphore.
69
70
Parameters:
71
- client (KazooClient): Connected Kazoo client
72
- path (str): Semaphore path in Zookeeper
73
- max_leases (int): Maximum number of concurrent holders
74
- identifier (str): Unique identifier for this holder
75
"""
76
77
def acquire(self, blocking=True, timeout=None):
78
"""Acquire a semaphore lease."""
79
80
def release(self):
81
"""Release the semaphore lease."""
82
83
@property
84
def lease_holders(self):
85
"""List of current lease holders."""
86
87
@property
88
def max_leases(self):
89
"""Maximum number of leases available."""
90
```
91
92
### Leader Election
93
94
Leader election algorithms for distributed systems requiring a single coordinator process with automatic failover and leadership transfer.
95
96
```python { .api }
97
class Election:
98
def __init__(self, client, path, identifier=None):
99
"""
100
Create a leader election.
101
102
Parameters:
103
- client (KazooClient): Connected Kazoo client
104
- path (str): Election path in Zookeeper
105
- identifier (str): Unique identifier for this candidate
106
"""
107
108
def run(self, func, *args, **kwargs):
109
"""
110
Run for leadership and execute function when elected.
111
112
Parameters:
113
- func (callable): Function to execute as leader
114
- args: Arguments for leader function
115
- kwargs: Keyword arguments for leader function
116
117
Returns:
118
Result of leader function
119
"""
120
121
def cancel(self):
122
"""Cancel leadership candidacy."""
123
124
@property
125
def contenders(self):
126
"""List of all election contenders."""
127
```
128
129
### Distributed Queues
130
131
Queue implementations for distributed task processing and message passing with priority support and blocking/non-blocking operations.
132
133
```python { .api }
134
class Queue:
135
def __init__(self, client, path):
136
"""
137
Create a distributed FIFO queue.
138
139
Parameters:
140
- client (KazooClient): Connected Kazoo client
141
- path (str): Queue path in Zookeeper
142
"""
143
144
def put(self, value, priority=100):
145
"""
146
Add item to queue.
147
148
Parameters:
149
- value (bytes): Item data
150
- priority (int): Item priority (lower = higher priority)
151
152
Returns:
153
str: Item path in queue
154
"""
155
156
def get(self, timeout=None):
157
"""
158
Get item from queue.
159
160
Parameters:
161
- timeout (float): Maximum time to wait for item
162
163
Returns:
164
bytes: Item data, or None if queue is empty (basic Queue) or timeout exceeded (LockingQueue)
165
"""
166
167
def put_all(self, items, priority=100):
168
"""Add multiple items to queue."""
169
170
@property
171
def length(self):
172
"""Number of items in queue."""
173
174
class LockingQueue(Queue):
175
def __init__(self, client, path):
176
"""
177
Queue with built-in locking for thread safety.
178
179
Parameters:
180
- client (KazooClient): Connected Kazoo client
181
- path (str): Queue path in Zookeeper
182
"""
183
184
def consume(self):
185
"""
186
Consume items from queue with locking.
187
188
Yields:
189
bytes: Queue items
190
"""
191
```
192
193
### Barriers and Synchronization
194
195
Synchronization primitives for coordinating distributed processes at specific execution points with support for both simple and double barriers.
196
197
```python { .api }
198
class Barrier:
199
def __init__(self, client, path, num_clients):
200
"""
201
Create a distributed barrier.
202
203
Parameters:
204
- client (KazooClient): Connected Kazoo client
205
- path (str): Barrier path in Zookeeper
206
- num_clients (int): Number of clients required
207
"""
208
209
def create(self):
210
"""Create the barrier node."""
211
212
def wait(self, timeout=None):
213
"""
214
Wait for all clients to reach barrier.
215
216
Parameters:
217
- timeout (float): Maximum time to wait
218
219
Returns:
220
bool: True if barrier released, False if timeout
221
"""
222
223
def remove(self):
224
"""Remove the barrier."""
225
226
class DoubleBarrier:
227
def __init__(self, client, path, num_clients, identifier=None):
228
"""
229
Create a double barrier for entry/exit synchronization.
230
231
Parameters:
232
- client (KazooClient): Connected Kazoo client
233
- path (str): Barrier path in Zookeeper
234
- num_clients (int): Number of clients required
235
- identifier (str): Unique client identifier
236
"""
237
238
def enter(self, timeout=None):
239
"""
240
Enter the barrier (wait for all clients).
241
242
Parameters:
243
- timeout (float): Maximum time to wait
244
245
Returns:
246
bool: True if entered, False if timeout
247
"""
248
249
def leave(self, timeout=None):
250
"""
251
Leave the barrier (wait for all clients to leave).
252
253
Parameters:
254
- timeout (float): Maximum time to wait
255
256
Returns:
257
bool: True if left, False if timeout
258
"""
259
```
260
261
### Distributed Counters
262
263
Atomic counter implementation for distributed counting operations with increment, decrement, and value retrieval operations.
264
265
```python { .api }
266
class Counter:
267
def __init__(self, client, path, default=0):
268
"""
269
Create a distributed counter.
270
271
Parameters:
272
- client (KazooClient): Connected Kazoo client
273
- path (str): Counter path in Zookeeper
274
- default (int): Default counter value
275
"""
276
277
@property
278
def value(self):
279
"""Current counter value."""
280
281
def get(self):
282
"""
283
Get current counter value.
284
285
Returns:
286
int: Current counter value
287
"""
288
289
def increment(self, amount=1):
290
"""
291
Increment counter atomically.
292
293
Parameters:
294
- amount (int): Amount to increment
295
296
Returns:
297
int: New counter value
298
"""
299
300
def decrement(self, amount=1):
301
"""
302
Decrement counter atomically.
303
304
Parameters:
305
- amount (int): Amount to decrement
306
307
Returns:
308
int: New counter value
309
"""
310
311
def reset(self, value=0):
312
"""
313
Reset counter to specific value.
314
315
Parameters:
316
- value (int): New counter value
317
"""
318
```
319
320
### Group Membership
321
322
Party implementations for tracking group membership and coordinating distributed processes with support for both full and shallow membership tracking.
323
324
```python { .api }
325
class Party:
326
def __init__(self, client, path, identifier=None):
327
"""
328
Create a distributed party for membership tracking.
329
330
Parameters:
331
- client (KazooClient): Connected Kazoo client
332
- path (str): Party path in Zookeeper
333
- identifier (str): Unique member identifier
334
"""
335
336
def join(self):
337
"""Join the party."""
338
339
def leave(self):
340
"""Leave the party."""
341
342
@property
343
def is_member(self):
344
"""True if currently a party member."""
345
346
def get_members(self):
347
"""
348
Get all party members.
349
350
Returns:
351
list: List of member identifiers
352
"""
353
354
def wait_for_members(self, count, timeout=None):
355
"""
356
Wait for specific number of members.
357
358
Parameters:
359
- count (int): Required member count
360
- timeout (float): Maximum time to wait
361
362
Returns:
363
bool: True if count reached, False if timeout
364
"""
365
366
class ShallowParty(Party):
367
"""Party with reduced overhead for large groups."""
368
```
369
370
### Work Partitioning
371
372
Partitioning system for distributing work across multiple processes with automatic rebalancing and failure recovery.
373
374
```python { .api }
375
class SetPartitioner:
376
def __init__(self, client, path, set, partition_func=None, identifier=None,
377
time_boundary=30, state_change_event=None):
378
"""
379
Create a set partitioner for distributed work.
380
381
Parameters:
382
- client (KazooClient): Connected Kazoo client
383
- path (str): Partitioner path in Zookeeper
384
- set (iterable): Items to partition
385
- partition_func (callable): Custom partition function
386
- identifier (str): Unique partitioner identifier
387
- time_boundary (int): Time boundary for rebalancing
388
- state_change_event: Event triggered on partition changes
389
"""
390
391
def __iter__(self):
392
"""Iterate over assigned partitions."""
393
394
def allocate_set(self):
395
"""Allocate partitions among participants."""
396
397
def finish(self):
398
"""Finish partitioning and cleanup."""
399
400
@property
401
def state(self):
402
"""Current partitioner state."""
403
404
@property
405
def failed(self):
406
"""True if partitioner has failed."""
407
408
@property
409
def release(self):
410
"""True if partitioner should release partitions."""
411
412
@property
413
def acquired(self):
414
"""True if partitions are acquired."""
415
416
@property
417
def allocating(self):
418
"""True if allocation is in progress."""
419
420
class PartitionState:
421
"""State constants for partitioner."""
422
ALLOCATING: str
423
ACQUIRED: str
424
RELEASE: str
425
FAILURE: str
426
```
427
428
### Resource Leasing
429
430
Lease implementations for temporary resource allocation with timeout-based automatic release and non-blocking acquisition patterns.
431
432
```python { .api }
433
class NonBlockingLease:
434
def __init__(self, client, path, duration, identifier=None):
435
"""
436
Create a non-blocking lease.
437
438
Parameters:
439
- client (KazooClient): Connected Kazoo client
440
- path (str): Lease path in Zookeeper
441
- duration (int): Lease duration in seconds
442
- identifier (str): Unique lease identifier
443
"""
444
445
def __enter__(self):
446
"""Context manager entry."""
447
448
def __exit__(self, exc_type, exc_val, exc_tb):
449
"""Context manager exit."""
450
451
def acquire(self):
452
"""
453
Acquire the lease.
454
455
Returns:
456
bool: True if lease acquired, False otherwise
457
"""
458
459
def release(self):
460
"""Release the lease."""
461
462
@property
463
def is_acquired(self):
464
"""True if lease is currently held."""
465
466
class MultiNonBlockingLease:
467
def __init__(self, client, path, count, duration, identifier=None):
468
"""
469
Create multiple non-blocking leases.
470
471
Parameters:
472
- client (KazooClient): Connected Kazoo client
473
- path (str): Base lease path in Zookeeper
474
- count (int): Number of leases to create
475
- duration (int): Lease duration in seconds
476
- identifier (str): Unique lease identifier
477
"""
478
479
def __iter__(self):
480
"""Iterate over leases."""
481
482
def acquire(self):
483
"""
484
Acquire all leases.
485
486
Returns:
487
bool: True if all leases acquired
488
"""
489
490
def release(self):
491
"""Release all leases."""
492
```
493
494
## Usage Examples
495
496
### Distributed Lock Example
497
498
```python
499
from kazoo.client import KazooClient
500
from kazoo.recipe.lock import Lock
501
import time
502
503
zk = KazooClient()
504
zk.start()
505
506
# Create lock
507
lock = Lock(zk, "/myapp/critical_section", "worker-1")
508
509
try:
510
# Acquire lock with timeout
511
if lock.acquire(timeout=10):
512
print("Lock acquired, performing critical work...")
513
time.sleep(5) # Simulate work
514
print("Work completed")
515
else:
516
print("Could not acquire lock within timeout")
517
finally:
518
lock.release()
519
zk.stop()
520
```
521
522
### Leader Election Example
523
524
```python
525
from kazoo.client import KazooClient
526
from kazoo.recipe.election import Election
527
import threading
528
529
def leader_function():
530
print("I am the leader!")
531
# Leadership work here
532
time.sleep(30)
533
print("Leadership term completed")
534
return "success"
535
536
zk = KazooClient()
537
zk.start()
538
539
election = Election(zk, "/myapp/election", "candidate-1")
540
541
try:
542
# Run for leadership
543
result = election.run(leader_function)
544
print(f"Leadership result: {result}")
545
except KeyboardInterrupt:
546
election.cancel()
547
finally:
548
zk.stop()
549
```
550
551
### Distributed Queue Example
552
553
```python
554
from kazoo.client import KazooClient
555
from kazoo.recipe.queue import Queue
556
import json
557
558
zk = KazooClient()
559
zk.start()
560
561
queue = Queue(zk, "/myapp/tasks")
562
563
try:
564
# Producer: Add tasks to queue
565
task_data = json.dumps({"task": "process_data", "params": {"file": "data.csv"}})
566
queue.put(task_data.encode('utf-8'), priority=1)
567
568
# Consumer: Process tasks from queue
569
while True:
570
try:
571
task = queue.get(timeout=5.0)
572
task_obj = json.loads(task.decode('utf-8'))
573
print(f"Processing task: {task_obj}")
574
# Process task here
575
576
if task_obj is None:
577
print("No tasks available")
578
break
579
580
finally:
581
zk.stop()
582
```
583
584
### Barrier Synchronization Example
585
586
```python
587
from kazoo.client import KazooClient
588
from kazoo.recipe.barrier import DoubleBarrier
589
import threading
590
591
def worker(worker_id):
592
zk = KazooClient()
593
zk.start()
594
595
barrier = DoubleBarrier(zk, "/myapp/sync", 3, f"worker-{worker_id}")
596
597
try:
598
print(f"Worker {worker_id} starting...")
599
600
# Wait for all workers to start
601
barrier.enter(timeout=30)
602
print(f"Worker {worker_id} entered barrier, starting work...")
603
604
# Do work
605
time.sleep(2)
606
607
# Wait for all workers to finish
608
barrier.leave(timeout=30)
609
print(f"Worker {worker_id} completed")
610
611
finally:
612
zk.stop()
613
614
# Start multiple workers
615
threads = []
616
for i in range(3):
617
t = threading.Thread(target=worker, args=(i,))
618
threads.append(t)
619
t.start()
620
621
for t in threads:
622
t.join()
623
```