CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kazoo

Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.

Pending
Overview
Eval results
Files

recipes.mddocs/

Distributed Recipes

High-level distributed coordination primitives built on top of Zookeeper. These recipes provide common distributed systems patterns like locks, leader elections, queues, barriers, and counters with reliable semantics and fault tolerance.

Capabilities

Distributed Locking

Mutual exclusion primitives for coordinating access to shared resources across distributed processes with support for both exclusive and shared locking patterns.

class Lock:
    def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
        """
        Create a distributed lock.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Lock path in Zookeeper
        - identifier (str): Unique identifier for this lock holder
        - extra_lock_patterns (tuple): Additional patterns for lock contender identification
        """
    
    def acquire(self, blocking=True, timeout=None):
        """
        Acquire the lock.
        
        Parameters:
        - blocking (bool): Block until lock is acquired
        - timeout (float): Maximum time to wait for lock
        
        Returns:
        bool: True if lock acquired, False if timeout
        
        Raises:
        - LockTimeout: If timeout exceeded
        """
    
    def release(self):
        """Release the lock."""
    
    @property
    def is_acquired(self):
        """True if lock is currently held."""

class WriteLock(Lock):
    """Exclusive write lock implementation."""

class ReadLock:
    def __init__(self, client, path, identifier=None):
        """
        Create a shared read lock.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Lock path in Zookeeper
        - identifier (str): Unique identifier for this lock holder
        """
    
    def acquire(self, blocking=True, timeout=None):
        """Acquire read lock (shared with other readers)."""
    
    def release(self):
        """Release read lock."""

class Semaphore:
    def __init__(self, client, path, max_leases, identifier=None):
        """
        Create a distributed semaphore.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Semaphore path in Zookeeper
        - max_leases (int): Maximum number of concurrent holders
        - identifier (str): Unique identifier for this holder
        """
    
    def acquire(self, blocking=True, timeout=None):
        """Acquire a semaphore lease."""
    
    def release(self):
        """Release the semaphore lease."""
    
    @property
    def lease_holders(self):
        """List of current lease holders."""
    
    @property
    def max_leases(self):
        """Maximum number of leases available."""

Leader Election

Leader election algorithms for distributed systems requiring a single coordinator process with automatic failover and leadership transfer.

class Election:
    def __init__(self, client, path, identifier=None):
        """
        Create a leader election.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Election path in Zookeeper
        - identifier (str): Unique identifier for this candidate
        """
    
    def run(self, func, *args, **kwargs):
        """
        Run for leadership and execute function when elected.
        
        Parameters:
        - func (callable): Function to execute as leader
        - args: Arguments for leader function
        - kwargs: Keyword arguments for leader function
        
        Returns:
        Result of leader function
        """
    
    def cancel(self):
        """Cancel leadership candidacy."""
    
    @property
    def contenders(self):
        """List of all election contenders."""

Distributed Queues

Queue implementations for distributed task processing and message passing with priority support and blocking/non-blocking operations.

class Queue:
    def __init__(self, client, path):
        """
        Create a distributed FIFO queue.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Queue path in Zookeeper
        """
    
    def put(self, value, priority=100):
        """
        Add item to queue.
        
        Parameters:
        - value (bytes): Item data
        - priority (int): Item priority (lower = higher priority)
        
        Returns:
        str: Item path in queue
        """
    
    def get(self, timeout=None):
        """
        Get item from queue.
        
        Parameters:
        - timeout (float): Maximum time to wait for item
        
        Returns:
        bytes: Item data, or None if queue is empty (basic Queue) or timeout exceeded (LockingQueue)
        """
    
    def put_all(self, items, priority=100):
        """Add multiple items to queue."""
    
    @property
    def length(self):
        """Number of items in queue."""

class LockingQueue(Queue):
    def __init__(self, client, path):
        """
        Queue with built-in locking for thread safety.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Queue path in Zookeeper
        """
    
    def consume(self):
        """
        Consume items from queue with locking.
        
        Yields:
        bytes: Queue items
        """

Barriers and Synchronization

Synchronization primitives for coordinating distributed processes at specific execution points with support for both simple and double barriers.

class Barrier:
    def __init__(self, client, path, num_clients):
        """
        Create a distributed barrier.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Barrier path in Zookeeper
        - num_clients (int): Number of clients required
        """
    
    def create(self):
        """Create the barrier node."""
    
    def wait(self, timeout=None):
        """
        Wait for all clients to reach barrier.
        
        Parameters:
        - timeout (float): Maximum time to wait
        
        Returns:
        bool: True if barrier released, False if timeout
        """
    
    def remove(self):
        """Remove the barrier."""

class DoubleBarrier:
    def __init__(self, client, path, num_clients, identifier=None):
        """
        Create a double barrier for entry/exit synchronization.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Barrier path in Zookeeper
        - num_clients (int): Number of clients required
        - identifier (str): Unique client identifier
        """
    
    def enter(self, timeout=None):
        """
        Enter the barrier (wait for all clients).
        
        Parameters:
        - timeout (float): Maximum time to wait
        
        Returns:
        bool: True if entered, False if timeout
        """
    
    def leave(self, timeout=None):
        """
        Leave the barrier (wait for all clients to leave).
        
        Parameters:
        - timeout (float): Maximum time to wait
        
        Returns:
        bool: True if left, False if timeout
        """

Distributed Counters

Atomic counter implementation for distributed counting operations with increment, decrement, and value retrieval operations.

class Counter:
    def __init__(self, client, path, default=0):
        """
        Create a distributed counter.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Counter path in Zookeeper
        - default (int): Default counter value
        """
    
    @property
    def value(self):
        """Current counter value."""
    
    def get(self):
        """
        Get current counter value.
        
        Returns:
        int: Current counter value
        """
    
    def increment(self, amount=1):
        """
        Increment counter atomically.
        
        Parameters:
        - amount (int): Amount to increment
        
        Returns:
        int: New counter value
        """
    
    def decrement(self, amount=1):
        """
        Decrement counter atomically.
        
        Parameters:
        - amount (int): Amount to decrement
        
        Returns:
        int: New counter value
        """
    
    def reset(self, value=0):
        """
        Reset counter to specific value.
        
        Parameters:
        - value (int): New counter value
        """

Group Membership

Party implementations for tracking group membership and coordinating distributed processes with support for both full and shallow membership tracking.

class Party:
    def __init__(self, client, path, identifier=None):
        """
        Create a distributed party for membership tracking.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Party path in Zookeeper
        - identifier (str): Unique member identifier
        """
    
    def join(self):
        """Join the party."""
    
    def leave(self):
        """Leave the party."""
    
    @property
    def is_member(self):
        """True if currently a party member."""
    
    def get_members(self):
        """
        Get all party members.
        
        Returns:
        list: List of member identifiers
        """
    
    def wait_for_members(self, count, timeout=None):
        """
        Wait for specific number of members.
        
        Parameters:
        - count (int): Required member count
        - timeout (float): Maximum time to wait
        
        Returns:
        bool: True if count reached, False if timeout
        """

class ShallowParty(Party):
    """Party with reduced overhead for large groups."""

Work Partitioning

Partitioning system for distributing work across multiple processes with automatic rebalancing and failure recovery.

class SetPartitioner:
    def __init__(self, client, path, set, partition_func=None, identifier=None,
                 time_boundary=30, state_change_event=None):
        """
        Create a set partitioner for distributed work.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Partitioner path in Zookeeper
        - set (iterable): Items to partition
        - partition_func (callable): Custom partition function
        - identifier (str): Unique partitioner identifier
        - time_boundary (int): Time boundary for rebalancing
        - state_change_event: Event triggered on partition changes
        """
    
    def __iter__(self):
        """Iterate over assigned partitions."""
    
    def allocate_set(self):
        """Allocate partitions among participants."""
    
    def finish(self):
        """Finish partitioning and cleanup."""
    
    @property
    def state(self):
        """Current partitioner state."""
    
    @property
    def failed(self):
        """True if partitioner has failed."""
    
    @property
    def release(self):
        """True if partitioner should release partitions."""
    
    @property
    def acquired(self):
        """True if partitions are acquired."""
    
    @property
    def allocating(self):
        """True if allocation is in progress."""

class PartitionState:
    """State constants for partitioner."""
    ALLOCATING: str
    ACQUIRED: str
    RELEASE: str
    FAILURE: str

Resource Leasing

Lease implementations for temporary resource allocation with timeout-based automatic release and non-blocking acquisition patterns.

class NonBlockingLease:
    def __init__(self, client, path, duration, identifier=None):
        """
        Create a non-blocking lease.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Lease path in Zookeeper
        - duration (int): Lease duration in seconds
        - identifier (str): Unique lease identifier
        """
    
    def __enter__(self):
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
    
    def acquire(self):
        """
        Acquire the lease.
        
        Returns:
        bool: True if lease acquired, False otherwise
        """
    
    def release(self):
        """Release the lease."""
    
    @property
    def is_acquired(self):
        """True if lease is currently held."""

class MultiNonBlockingLease:
    def __init__(self, client, path, count, duration, identifier=None):
        """
        Create multiple non-blocking leases.
        
        Parameters:
        - client (KazooClient): Connected Kazoo client
        - path (str): Base lease path in Zookeeper
        - count (int): Number of leases to create
        - duration (int): Lease duration in seconds
        - identifier (str): Unique lease identifier
        """
    
    def __iter__(self):
        """Iterate over leases."""
    
    def acquire(self):
        """
        Acquire all leases.
        
        Returns:
        bool: True if all leases acquired
        """
    
    def release(self):
        """Release all leases."""

Usage Examples

Distributed Lock Example

from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
import time

zk = KazooClient()
zk.start()

# Create lock
lock = Lock(zk, "/myapp/critical_section", "worker-1")

try:
    # Acquire lock with timeout
    if lock.acquire(timeout=10):
        print("Lock acquired, performing critical work...")
        time.sleep(5)  # Simulate work
        print("Work completed")
    else:
        print("Could not acquire lock within timeout")
finally:
    lock.release()
    zk.stop()

Leader Election Example

from kazoo.client import KazooClient
from kazoo.recipe.election import Election
import threading

def leader_function():
    print("I am the leader!")
    # Leadership work here
    time.sleep(30)
    print("Leadership term completed")
    return "success"

zk = KazooClient()
zk.start()

election = Election(zk, "/myapp/election", "candidate-1")

try:
    # Run for leadership
    result = election.run(leader_function)
    print(f"Leadership result: {result}")
except KeyboardInterrupt:
    election.cancel()
finally:
    zk.stop()

Distributed Queue Example

from kazoo.client import KazooClient
from kazoo.recipe.queue import Queue
import json

zk = KazooClient()
zk.start()

queue = Queue(zk, "/myapp/tasks")

try:
    # Producer: Add tasks to queue
    task_data = json.dumps({"task": "process_data", "params": {"file": "data.csv"}})
    queue.put(task_data.encode('utf-8'), priority=1)
    
    # Consumer: Process tasks from queue
    while True:
        try:
            task = queue.get(timeout=5.0)
            task_obj = json.loads(task.decode('utf-8'))
            print(f"Processing task: {task_obj}")
            # Process task here
            
        if task_obj is None:
            print("No tasks available")
            break
            
finally:
    zk.stop()

Barrier Synchronization Example

from kazoo.client import KazooClient
from kazoo.recipe.barrier import DoubleBarrier
import threading

def worker(worker_id):
    zk = KazooClient()
    zk.start()
    
    barrier = DoubleBarrier(zk, "/myapp/sync", 3, f"worker-{worker_id}")
    
    try:
        print(f"Worker {worker_id} starting...")
        
        # Wait for all workers to start
        barrier.enter(timeout=30)
        print(f"Worker {worker_id} entered barrier, starting work...")
        
        # Do work
        time.sleep(2)
        
        # Wait for all workers to finish
        barrier.leave(timeout=30)
        print(f"Worker {worker_id} completed")
        
    finally:
        zk.stop()

# Start multiple workers
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Install with Tessl CLI

npx tessl i tessl/pypi-kazoo

docs

core-client.md

exceptions.md

handlers.md

index.md

recipes.md

security.md

testing.md

tile.json