Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.
—
High-level distributed coordination primitives built on top of Zookeeper. These recipes provide common distributed systems patterns like locks, leader elections, queues, barriers, and counters with reliable semantics and fault tolerance.
Mutual exclusion primitives for coordinating access to shared resources across distributed processes with support for both exclusive and shared locking patterns.
class Lock:
def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
"""
Create a distributed lock.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Lock path in Zookeeper
- identifier (str): Unique identifier for this lock holder
- extra_lock_patterns (tuple): Additional patterns for lock contender identification
"""
def acquire(self, blocking=True, timeout=None):
"""
Acquire the lock.
Parameters:
- blocking (bool): Block until lock is acquired
- timeout (float): Maximum time to wait for lock
Returns:
bool: True if lock acquired, False if timeout
Raises:
- LockTimeout: If timeout exceeded
"""
def release(self):
"""Release the lock."""
@property
def is_acquired(self):
"""True if lock is currently held."""
class WriteLock(Lock):
"""Exclusive write lock implementation."""
class ReadLock:
def __init__(self, client, path, identifier=None):
"""
Create a shared read lock.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Lock path in Zookeeper
- identifier (str): Unique identifier for this lock holder
"""
def acquire(self, blocking=True, timeout=None):
"""Acquire read lock (shared with other readers)."""
def release(self):
"""Release read lock."""
class Semaphore:
def __init__(self, client, path, max_leases, identifier=None):
"""
Create a distributed semaphore.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Semaphore path in Zookeeper
- max_leases (int): Maximum number of concurrent holders
- identifier (str): Unique identifier for this holder
"""
def acquire(self, blocking=True, timeout=None):
"""Acquire a semaphore lease."""
def release(self):
"""Release the semaphore lease."""
@property
def lease_holders(self):
"""List of current lease holders."""
@property
def max_leases(self):
"""Maximum number of leases available."""Leader election algorithms for distributed systems requiring a single coordinator process with automatic failover and leadership transfer.
class Election:
def __init__(self, client, path, identifier=None):
"""
Create a leader election.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Election path in Zookeeper
- identifier (str): Unique identifier for this candidate
"""
def run(self, func, *args, **kwargs):
"""
Run for leadership and execute function when elected.
Parameters:
- func (callable): Function to execute as leader
- args: Arguments for leader function
- kwargs: Keyword arguments for leader function
Returns:
Result of leader function
"""
def cancel(self):
"""Cancel leadership candidacy."""
@property
def contenders(self):
"""List of all election contenders."""Queue implementations for distributed task processing and message passing with priority support and blocking/non-blocking operations.
class Queue:
def __init__(self, client, path):
"""
Create a distributed FIFO queue.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Queue path in Zookeeper
"""
def put(self, value, priority=100):
"""
Add item to queue.
Parameters:
- value (bytes): Item data
- priority (int): Item priority (lower = higher priority)
Returns:
str: Item path in queue
"""
def get(self, timeout=None):
"""
Get item from queue.
Parameters:
- timeout (float): Maximum time to wait for item
Returns:
bytes: Item data, or None if queue is empty (basic Queue) or timeout exceeded (LockingQueue)
"""
def put_all(self, items, priority=100):
"""Add multiple items to queue."""
@property
def length(self):
"""Number of items in queue."""
class LockingQueue(Queue):
def __init__(self, client, path):
"""
Queue with built-in locking for thread safety.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Queue path in Zookeeper
"""
def consume(self):
"""
Consume items from queue with locking.
Yields:
bytes: Queue items
"""Synchronization primitives for coordinating distributed processes at specific execution points with support for both simple and double barriers.
class Barrier:
def __init__(self, client, path, num_clients):
"""
Create a distributed barrier.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Barrier path in Zookeeper
- num_clients (int): Number of clients required
"""
def create(self):
"""Create the barrier node."""
def wait(self, timeout=None):
"""
Wait for all clients to reach barrier.
Parameters:
- timeout (float): Maximum time to wait
Returns:
bool: True if barrier released, False if timeout
"""
def remove(self):
"""Remove the barrier."""
class DoubleBarrier:
def __init__(self, client, path, num_clients, identifier=None):
"""
Create a double barrier for entry/exit synchronization.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Barrier path in Zookeeper
- num_clients (int): Number of clients required
- identifier (str): Unique client identifier
"""
def enter(self, timeout=None):
"""
Enter the barrier (wait for all clients).
Parameters:
- timeout (float): Maximum time to wait
Returns:
bool: True if entered, False if timeout
"""
def leave(self, timeout=None):
"""
Leave the barrier (wait for all clients to leave).
Parameters:
- timeout (float): Maximum time to wait
Returns:
bool: True if left, False if timeout
"""Atomic counter implementation for distributed counting operations with increment, decrement, and value retrieval operations.
class Counter:
def __init__(self, client, path, default=0):
"""
Create a distributed counter.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Counter path in Zookeeper
- default (int): Default counter value
"""
@property
def value(self):
"""Current counter value."""
def get(self):
"""
Get current counter value.
Returns:
int: Current counter value
"""
def increment(self, amount=1):
"""
Increment counter atomically.
Parameters:
- amount (int): Amount to increment
Returns:
int: New counter value
"""
def decrement(self, amount=1):
"""
Decrement counter atomically.
Parameters:
- amount (int): Amount to decrement
Returns:
int: New counter value
"""
def reset(self, value=0):
"""
Reset counter to specific value.
Parameters:
- value (int): New counter value
"""Party implementations for tracking group membership and coordinating distributed processes with support for both full and shallow membership tracking.
class Party:
def __init__(self, client, path, identifier=None):
"""
Create a distributed party for membership tracking.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Party path in Zookeeper
- identifier (str): Unique member identifier
"""
def join(self):
"""Join the party."""
def leave(self):
"""Leave the party."""
@property
def is_member(self):
"""True if currently a party member."""
def get_members(self):
"""
Get all party members.
Returns:
list: List of member identifiers
"""
def wait_for_members(self, count, timeout=None):
"""
Wait for specific number of members.
Parameters:
- count (int): Required member count
- timeout (float): Maximum time to wait
Returns:
bool: True if count reached, False if timeout
"""
class ShallowParty(Party):
"""Party with reduced overhead for large groups."""Partitioning system for distributing work across multiple processes with automatic rebalancing and failure recovery.
class SetPartitioner:
def __init__(self, client, path, set, partition_func=None, identifier=None,
time_boundary=30, state_change_event=None):
"""
Create a set partitioner for distributed work.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Partitioner path in Zookeeper
- set (iterable): Items to partition
- partition_func (callable): Custom partition function
- identifier (str): Unique partitioner identifier
- time_boundary (int): Time boundary for rebalancing
- state_change_event: Event triggered on partition changes
"""
def __iter__(self):
"""Iterate over assigned partitions."""
def allocate_set(self):
"""Allocate partitions among participants."""
def finish(self):
"""Finish partitioning and cleanup."""
@property
def state(self):
"""Current partitioner state."""
@property
def failed(self):
"""True if partitioner has failed."""
@property
def release(self):
"""True if partitioner should release partitions."""
@property
def acquired(self):
"""True if partitions are acquired."""
@property
def allocating(self):
"""True if allocation is in progress."""
class PartitionState:
"""State constants for partitioner."""
ALLOCATING: str
ACQUIRED: str
RELEASE: str
FAILURE: strLease implementations for temporary resource allocation with timeout-based automatic release and non-blocking acquisition patterns.
class NonBlockingLease:
def __init__(self, client, path, duration, identifier=None):
"""
Create a non-blocking lease.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Lease path in Zookeeper
- duration (int): Lease duration in seconds
- identifier (str): Unique lease identifier
"""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
def acquire(self):
"""
Acquire the lease.
Returns:
bool: True if lease acquired, False otherwise
"""
def release(self):
"""Release the lease."""
@property
def is_acquired(self):
"""True if lease is currently held."""
class MultiNonBlockingLease:
def __init__(self, client, path, count, duration, identifier=None):
"""
Create multiple non-blocking leases.
Parameters:
- client (KazooClient): Connected Kazoo client
- path (str): Base lease path in Zookeeper
- count (int): Number of leases to create
- duration (int): Lease duration in seconds
- identifier (str): Unique lease identifier
"""
def __iter__(self):
"""Iterate over leases."""
def acquire(self):
"""
Acquire all leases.
Returns:
bool: True if all leases acquired
"""
def release(self):
"""Release all leases."""from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
import time
zk = KazooClient()
zk.start()
# Create lock
lock = Lock(zk, "/myapp/critical_section", "worker-1")
try:
# Acquire lock with timeout
if lock.acquire(timeout=10):
print("Lock acquired, performing critical work...")
time.sleep(5) # Simulate work
print("Work completed")
else:
print("Could not acquire lock within timeout")
finally:
lock.release()
zk.stop()from kazoo.client import KazooClient
from kazoo.recipe.election import Election
import threading
def leader_function():
print("I am the leader!")
# Leadership work here
time.sleep(30)
print("Leadership term completed")
return "success"
zk = KazooClient()
zk.start()
election = Election(zk, "/myapp/election", "candidate-1")
try:
# Run for leadership
result = election.run(leader_function)
print(f"Leadership result: {result}")
except KeyboardInterrupt:
election.cancel()
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.recipe.queue import Queue
import json
zk = KazooClient()
zk.start()
queue = Queue(zk, "/myapp/tasks")
try:
# Producer: Add tasks to queue
task_data = json.dumps({"task": "process_data", "params": {"file": "data.csv"}})
queue.put(task_data.encode('utf-8'), priority=1)
# Consumer: Process tasks from queue
while True:
try:
task = queue.get(timeout=5.0)
task_obj = json.loads(task.decode('utf-8'))
print(f"Processing task: {task_obj}")
# Process task here
if task_obj is None:
print("No tasks available")
break
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.recipe.barrier import DoubleBarrier
import threading
def worker(worker_id):
zk = KazooClient()
zk.start()
barrier = DoubleBarrier(zk, "/myapp/sync", 3, f"worker-{worker_id}")
try:
print(f"Worker {worker_id} starting...")
# Wait for all workers to start
barrier.enter(timeout=30)
print(f"Worker {worker_id} entered barrier, starting work...")
# Do work
time.sleep(2)
# Wait for all workers to finish
barrier.leave(timeout=30)
print(f"Worker {worker_id} completed")
finally:
zk.stop()
# Start multiple workers
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()Install with Tessl CLI
npx tessl i tessl/pypi-kazoo