CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kazoo

Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.

Pending
Overview
Eval results
Files

handlers.mddocs/

Async Handlers

Pluggable concurrency models supporting different async frameworks. Handlers manage callback execution, timeouts, and async result objects with support for threading, gevent, and eventlet paradigms for scalable concurrent Zookeeper operations.

Capabilities

Handler Interface

Base interface and common functionality for all async handler implementations with standardized callback management and async result handling.

class IHandler:
    """Interface for callback handlers."""
    
    def start(self):
        """Start the handler."""
    
    def stop(self):
        """Stop the handler."""
    
    def select(self, sockets, timeout):
        """
        Select on sockets for I/O readiness.
        
        Parameters:
        - sockets (list): List of socket file descriptors
        - timeout (float): Timeout in seconds
        
        Returns:
        tuple: (read_ready, write_ready, error_ready) socket lists
        """
    
    def callback_object(self):
        """
        Create callback result object.
        
        Returns:
        IAsyncResult: Async result object
        """
    
    def dispatch_callback(self, callback):
        """
        Dispatch callback for execution.
        
        Parameters:
        - callback (Callback): Callback object to execute
        """

class IAsyncResult:
    """Interface for async result objects."""
    
    def ready(self):
        """
        Check if result is ready.
        
        Returns:
        bool: True if result is available
        """
    
    def successful(self):
        """
        Check if operation was successful.
        
        Returns:
        bool: True if successful, False if exception occurred
        """
    
    def get(self, block=True, timeout=None):
        """
        Get the result.
        
        Parameters:
        - block (bool): Block until result is available
        - timeout (float): Maximum time to wait
        
        Returns:
        Result value or raises exception
        """
    
    def get_nowait(self):
        """
        Get result without blocking.
        
        Returns:
        Result value or raises exception if not ready
        """
    
    def set(self, value):
        """
        Set the result value.
        
        Parameters:
        - value: Result value to set
        """
    
    def set_exception(self, exception):
        """
        Set an exception as the result.
        
        Parameters:
        - exception (Exception): Exception to set
        """

Threading Handler

Threading-based async handler using Python's threading module for traditional multi-threaded concurrency with thread-safe callback execution.

class SequentialThreadingHandler:
    def __init__(self):
        """
        Threading-based handler for sequential callback execution.
        
        Uses threading.Thread for concurrent operations and threading
        primitives for synchronization and callback management.
        """
    
    def start(self):
        """Start the threading handler."""
    
    def stop(self):
        """Stop the threading handler and cleanup threads."""
    
    def select(self, sockets, timeout):
        """
        Socket selection using select.select().
        
        Parameters:
        - sockets (list): Socket file descriptors
        - timeout (float): Selection timeout
        
        Returns:
        tuple: Ready socket lists (read, write, error)
        """
    
    def callback_object(self):
        """
        Create threading-based async result.
        
        Returns:
        AsyncResult: Threading async result object
        """
    
    def dispatch_callback(self, callback):
        """
        Dispatch callback in thread-safe manner.
        
        Parameters:
        - callback (Callback): Callback to execute
        """
    
    def create_connection(self, *args, **kwargs):
        """
        Create socket connection.
        
        Returns:
        socket: Connected socket object
        """

class AsyncResult:
    """Threading-based async result implementation."""
    
    def __init__(self):
        """Initialize with threading.Event for synchronization."""
    
    def ready(self):
        """Check if result is ready using threading.Event."""
    
    def successful(self):
        """Check if operation succeeded."""
    
    def get(self, block=True, timeout=None):
        """
        Get result with optional blocking and timeout.
        
        Uses threading.Event.wait() for blocking behavior.
        """
    
    def get_nowait(self):
        """Get result immediately or raise exception."""
    
    def set(self, value):
        """Set result value and notify waiters."""
    
    def set_exception(self, exception):
        """Set exception and notify waiters."""

class KazooTimeoutError(Exception):
    """Timeout exception for threading handler operations."""

Gevent Handler

Gevent-based async handler using greenlets for cooperative concurrency with gevent's async I/O capabilities and green threading model.

class SequentialGeventHandler:
    def __init__(self):
        """
        Gevent-based handler for greenlet concurrency.
        
        Requires gevent >= 1.2 for proper async I/O support.
        Uses gevent.select() for socket operations.
        """
    
    def start(self):
        """Start gevent handler."""
    
    def stop(self):
        """Stop gevent handler."""
    
    def select(self, sockets, timeout):
        """
        Socket selection using gevent.select.select().
        
        Parameters:
        - sockets (list): Socket file descriptors  
        - timeout (float): Selection timeout
        
        Returns:
        tuple: Ready socket lists optimized for gevent
        """
    
    def callback_object(self):
        """
        Create gevent async result.
        
        Returns:
        AsyncResult: Gevent-compatible async result
        """
    
    def dispatch_callback(self, callback):
        """
        Dispatch callback using gevent.spawn().
        
        Parameters:
        - callback (Callback): Callback for greenlet execution
        """
    
    def create_connection(self, *args, **kwargs):
        """
        Create gevent-compatible socket connection.
        
        Returns:
        gevent.socket: Gevent socket object
        """
    
    def create_socket_pair(self):
        """
        Create gevent socket pair for communication.
        
        Returns:
        tuple: (socket1, socket2) gevent socket pair
        """

Eventlet Handler

Eventlet-based async handler using green threads for cooperative concurrency with eventlet's async I/O and green threading capabilities.

class SequentialEventletHandler:
    def __init__(self):
        """
        Eventlet-based handler for green thread concurrency.
        
        Requires eventlet >= 0.17.1 for proper async support.
        Uses eventlet.select() for socket operations.
        """
    
    def start(self):
        """Start eventlet handler."""
    
    def stop(self):
        """Stop eventlet handler."""
    
    def select(self, sockets, timeout):
        """
        Socket selection using eventlet.select.select().
        
        Parameters:
        - sockets (list): Socket file descriptors
        - timeout (float): Selection timeout  
        
        Returns:
        tuple: Ready socket lists for eventlet
        """
    
    def callback_object(self):
        """
        Create eventlet async result.
        
        Returns:
        AsyncResult: Eventlet-compatible async result
        """
    
    def dispatch_callback(self, callback):
        """
        Dispatch callback using eventlet.spawn().
        
        Parameters:
        - callback (Callback): Callback for green thread execution
        """
    
    def create_connection(self, *args, **kwargs):
        """
        Create eventlet-compatible socket connection.
        
        Returns:
        eventlet.green.socket: Eventlet socket object
        """

class AsyncResult:
    """Eventlet async result implementation."""
    
    def __init__(self):
        """Initialize with eventlet.Event for coordination."""
    
    def ready(self):
        """Check if result is ready using eventlet primitives."""
    
    def successful(self):
        """Check operation success status."""
    
    def get(self, block=True, timeout=None):
        """
        Get result with eventlet timeout support.
        
        Uses eventlet.timeout.Timeout for timeout handling.
        """
    
    def get_nowait(self):
        """Get result without blocking in green thread."""
    
    def set(self, value):
        """Set result and wake waiting green threads."""
    
    def set_exception(self, exception):
        """Set exception and notify green threads."""

class TimeoutError(Exception):
    """Timeout exception for eventlet handler operations."""

Handler Utilities

Utility functions and base classes supporting all handler implementations with common socket operations and async result patterns.

class AsyncResult:
    """Base async result implementation."""
    
    def __init__(self):
        """Initialize base async result."""
    
    def ready(self):
        """Check if result is available."""
    
    def successful(self):
        """Check if operation was successful."""
    
    def get(self, block=True, timeout=None):
        """Get result value with optional timeout."""
    
    def get_nowait(self):
        """Get result without blocking."""
    
    def set(self, value):
        """Set the result value."""
    
    def set_exception(self, exception):
        """Set an exception as result."""
    
    def link(self, callback):
        """
        Link callback to be called when result is ready.
        
        Parameters:
        - callback (callable): Function to call when ready
        """
    
    def unlink(self, callback):
        """
        Unlink previously linked callback.
        
        Parameters:
        - callback (callable): Callback to remove
        """

def create_socket_pair():
    """
    Create connected socket pair for communication.
    
    Returns:
    tuple: (socket1, socket2) connected socket pair
    """

def create_tcp_socket(module):
    """
    Create TCP socket using specified socket module.
    
    Parameters:
    - module: Socket module (socket, gevent.socket, etc.)
    
    Returns:
    socket: TCP socket object
    """

def create_tcp_connection(module, address, timeout=None):
    """
    Create TCP connection to address.
    
    Parameters:
    - module: Socket module to use
    - address (tuple): (host, port) tuple
    - timeout (float): Connection timeout
    
    Returns:
    socket: Connected socket
    """

def capture_exceptions(async_object):
    """
    Decorator to capture exceptions in async operations.
    
    Parameters:
    - async_object: Async result object
    
    Returns:
    Decorator function
    """

def wrap(async_object):
    """
    Decorator to wrap function with async result.
    
    Parameters:
    - async_object: Async result object
    
    Returns:
    Decorator function
    """

def fileobj_to_fd(fileobj):
    """
    Convert file object to file descriptor.
    
    Parameters:
    - fileobj: File-like object
    
    Returns:
    int: File descriptor
    """

def selector_select(selector, timeout):
    """
    Select using selectors module.
    
    Parameters:
    - selector: Selector object
    - timeout (float): Selection timeout
    
    Returns:
    list: Ready selectors
    """

Usage Examples

Threading Handler Example

from kazoo.client import KazooClient
from kazoo.handlers.threading import SequentialThreadingHandler
import threading
import time

# Create client with threading handler (default)
handler = SequentialThreadingHandler()
zk = KazooClient(hosts='localhost:2181', handler=handler)

def connection_listener(state):
    print(f"Connection state changed: {state}")

zk.add_listener(connection_listener)

try:
    zk.start()
    
    # Perform async operations
    async_result = zk.create_async("/threading-test", b"test data", makepath=True)
    
    # Wait for result with timeout
    try:
        path = async_result.get(timeout=5.0)
        print(f"Created path: {path}")
    except Exception as e:
        print(f"Creation failed: {e}")
    
    # Multiple async operations
    get_result = zk.get_async("/threading-test")
    exists_result = zk.exists_async("/threading-test")
    
    # Wait for both results
    data, stat = get_result.get()
    exists_stat = exists_result.get()
    
    print(f"Data: {data}, Exists: {exists_stat is not None}")
    
finally:
    zk.stop()

Gevent Handler Example

from kazoo.client import KazooClient
from kazoo.handlers.gevent import SequentialGeventHandler
import gevent
from gevent import spawn

# Create client with gevent handler
handler = SequentialGeventHandler()
zk = KazooClient(hosts='localhost:2181', handler=handler)

def worker(worker_id, path_base):
    """Worker greenlet function."""
    try:
        # Each worker creates its own path
        path = f"{path_base}/worker-{worker_id}"
        zk.create(path, f"worker {worker_id} data".encode(), makepath=True)
        
        # Simulate some work
        gevent.sleep(1)
        
        # Update data
        zk.set(path, f"worker {worker_id} updated".encode())
        
        print(f"Worker {worker_id} completed")
        
    except Exception as e:
        print(f"Worker {worker_id} failed: {e}")

try:
    zk.start()
    
    # Create base path
    zk.create("/gevent-test", b"base", makepath=True)
    
    # Spawn multiple worker greenlets
    greenlets = []
    for i in range(5):
        g = spawn(worker, i, "/gevent-test")
        greenlets.append(g)
    
    # Wait for all workers to complete
    gevent.joinall(greenlets)
    
    # List all worker paths
    children = zk.get_children("/gevent-test")
    print(f"Created paths: {children}")
    
finally:
    zk.stop()

Eventlet Handler Example

from kazoo.client import KazooClient
from kazoo.handlers.eventlet import SequentialEventletHandler
import eventlet
from eventlet import spawn

# Create client with eventlet handler
handler = SequentialEventletHandler()
zk = KazooClient(hosts='localhost:2181', handler=handler)

def async_worker(path, data):
    """Async worker using eventlet."""
    try:
        # Create node
        actual_path = zk.create(path, data.encode(), sequence=True, makepath=True)
        
        # Simulate async work
        eventlet.sleep(0.5)
        
        # Read back data
        read_data, stat = zk.get(actual_path)
        print(f"Created {actual_path}: {read_data.decode()}")
        
        return actual_path
        
    except Exception as e:
        print(f"Worker failed for {path}: {e}")
        return None

try:
    zk.start()
    
    # Create multiple async workers
    pool = eventlet.GreenPool(10)
    
    results = []
    for i in range(10):
        future = pool.spawn(async_worker, "/eventlet-test/item-", f"data-{i}")
        results.append(future)
    
    # Wait for all to complete
    paths = [future.wait() for future in results]
    successful_paths = [p for p in paths if p is not None]
    
    print(f"Successfully created {len(successful_paths)} paths")
    
finally:
    zk.stop()

Custom Handler Configuration

from kazoo.client import KazooClient
from kazoo.handlers.threading import SequentialThreadingHandler
from kazoo.retry import KazooRetry

# Configure custom handler with retry policy
handler = SequentialThreadingHandler()
handler.start()

# Configure retry policies
connection_retry = KazooRetry(max_tries=3, delay=1, backoff=2)
command_retry = KazooRetry(max_tries=5, delay=0.1, backoff=1.5)

# Create client with custom configuration
zk = KazooClient(
    hosts='zk1:2181,zk2:2181,zk3:2181',
    handler=handler,
    connection_retry=connection_retry,
    command_retry=command_retry,
    timeout=30.0
)

def state_listener(state):
    print(f"State changed to: {state}")

zk.add_listener(state_listener)

try:
    zk.start(timeout=15)
    
    # Perform operations with custom handler
    result = zk.create("/custom-handler-test", b"handler data", makepath=True)
    print(f"Created with custom handler: {result}")
    
finally:
    zk.stop()
    handler.stop()

Async Result Patterns

from kazoo.client import KazooClient
from kazoo.handlers.threading import AsyncResult
import threading
import time

zk = KazooClient()
zk.start()

def result_callback(async_result):
    """Callback function for async result."""
    try:
        result = async_result.get_nowait()
        print(f"Callback received result: {result}")
    except Exception as e:
        print(f"Callback received exception: {e}")

try:
    # Create async operation
    async_result = zk.create_async("/async-test", b"async data", makepath=True)
    
    # Link callback to be called when ready
    async_result.link(result_callback)
    
    # Wait for result in another thread
    def wait_for_result():
        try:
            path = async_result.get(timeout=10)
            print(f"Background thread got result: {path}")
        except Exception as e:
            print(f"Background thread got exception: {e}")
    
    thread = threading.Thread(target=wait_for_result)
    thread.start()
    
    # Main thread continues other work
    time.sleep(1)
    print("Main thread continuing...")
    
    # Wait for background thread
    thread.join()
    
    # Check if result is ready
    if async_result.ready():
        print(f"Result is ready: {async_result.successful()}")
    
finally:
    zk.stop()

Install with Tessl CLI

npx tessl i tessl/pypi-kazoo

docs

core-client.md

exceptions.md

handlers.md

index.md

recipes.md

security.md

testing.md

tile.json