Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.
—
Pluggable concurrency models supporting different async frameworks. Handlers manage callback execution, timeouts, and async result objects with support for threading, gevent, and eventlet paradigms for scalable concurrent Zookeeper operations.
Base interface and common functionality for all async handler implementations with standardized callback management and async result handling.
class IHandler:
"""Interface for callback handlers."""
def start(self):
"""Start the handler."""
def stop(self):
"""Stop the handler."""
def select(self, sockets, timeout):
"""
Select on sockets for I/O readiness.
Parameters:
- sockets (list): List of socket file descriptors
- timeout (float): Timeout in seconds
Returns:
tuple: (read_ready, write_ready, error_ready) socket lists
"""
def callback_object(self):
"""
Create callback result object.
Returns:
IAsyncResult: Async result object
"""
def dispatch_callback(self, callback):
"""
Dispatch callback for execution.
Parameters:
- callback (Callback): Callback object to execute
"""
class IAsyncResult:
"""Interface for async result objects."""
def ready(self):
"""
Check if result is ready.
Returns:
bool: True if result is available
"""
def successful(self):
"""
Check if operation was successful.
Returns:
bool: True if successful, False if exception occurred
"""
def get(self, block=True, timeout=None):
"""
Get the result.
Parameters:
- block (bool): Block until result is available
- timeout (float): Maximum time to wait
Returns:
Result value or raises exception
"""
def get_nowait(self):
"""
Get result without blocking.
Returns:
Result value or raises exception if not ready
"""
def set(self, value):
"""
Set the result value.
Parameters:
- value: Result value to set
"""
def set_exception(self, exception):
"""
Set an exception as the result.
Parameters:
- exception (Exception): Exception to set
"""Threading-based async handler using Python's threading module for traditional multi-threaded concurrency with thread-safe callback execution.
class SequentialThreadingHandler:
def __init__(self):
"""
Threading-based handler for sequential callback execution.
Uses threading.Thread for concurrent operations and threading
primitives for synchronization and callback management.
"""
def start(self):
"""Start the threading handler."""
def stop(self):
"""Stop the threading handler and cleanup threads."""
def select(self, sockets, timeout):
"""
Socket selection using select.select().
Parameters:
- sockets (list): Socket file descriptors
- timeout (float): Selection timeout
Returns:
tuple: Ready socket lists (read, write, error)
"""
def callback_object(self):
"""
Create threading-based async result.
Returns:
AsyncResult: Threading async result object
"""
def dispatch_callback(self, callback):
"""
Dispatch callback in thread-safe manner.
Parameters:
- callback (Callback): Callback to execute
"""
def create_connection(self, *args, **kwargs):
"""
Create socket connection.
Returns:
socket: Connected socket object
"""
class AsyncResult:
"""Threading-based async result implementation."""
def __init__(self):
"""Initialize with threading.Event for synchronization."""
def ready(self):
"""Check if result is ready using threading.Event."""
def successful(self):
"""Check if operation succeeded."""
def get(self, block=True, timeout=None):
"""
Get result with optional blocking and timeout.
Uses threading.Event.wait() for blocking behavior.
"""
def get_nowait(self):
"""Get result immediately or raise exception."""
def set(self, value):
"""Set result value and notify waiters."""
def set_exception(self, exception):
"""Set exception and notify waiters."""
class KazooTimeoutError(Exception):
"""Timeout exception for threading handler operations."""Gevent-based async handler using greenlets for cooperative concurrency with gevent's async I/O capabilities and green threading model.
class SequentialGeventHandler:
def __init__(self):
"""
Gevent-based handler for greenlet concurrency.
Requires gevent >= 1.2 for proper async I/O support.
Uses gevent.select() for socket operations.
"""
def start(self):
"""Start gevent handler."""
def stop(self):
"""Stop gevent handler."""
def select(self, sockets, timeout):
"""
Socket selection using gevent.select.select().
Parameters:
- sockets (list): Socket file descriptors
- timeout (float): Selection timeout
Returns:
tuple: Ready socket lists optimized for gevent
"""
def callback_object(self):
"""
Create gevent async result.
Returns:
AsyncResult: Gevent-compatible async result
"""
def dispatch_callback(self, callback):
"""
Dispatch callback using gevent.spawn().
Parameters:
- callback (Callback): Callback for greenlet execution
"""
def create_connection(self, *args, **kwargs):
"""
Create gevent-compatible socket connection.
Returns:
gevent.socket: Gevent socket object
"""
def create_socket_pair(self):
"""
Create gevent socket pair for communication.
Returns:
tuple: (socket1, socket2) gevent socket pair
"""Eventlet-based async handler using green threads for cooperative concurrency with eventlet's async I/O and green threading capabilities.
class SequentialEventletHandler:
def __init__(self):
"""
Eventlet-based handler for green thread concurrency.
Requires eventlet >= 0.17.1 for proper async support.
Uses eventlet.select() for socket operations.
"""
def start(self):
"""Start eventlet handler."""
def stop(self):
"""Stop eventlet handler."""
def select(self, sockets, timeout):
"""
Socket selection using eventlet.select.select().
Parameters:
- sockets (list): Socket file descriptors
- timeout (float): Selection timeout
Returns:
tuple: Ready socket lists for eventlet
"""
def callback_object(self):
"""
Create eventlet async result.
Returns:
AsyncResult: Eventlet-compatible async result
"""
def dispatch_callback(self, callback):
"""
Dispatch callback using eventlet.spawn().
Parameters:
- callback (Callback): Callback for green thread execution
"""
def create_connection(self, *args, **kwargs):
"""
Create eventlet-compatible socket connection.
Returns:
eventlet.green.socket: Eventlet socket object
"""
class AsyncResult:
"""Eventlet async result implementation."""
def __init__(self):
"""Initialize with eventlet.Event for coordination."""
def ready(self):
"""Check if result is ready using eventlet primitives."""
def successful(self):
"""Check operation success status."""
def get(self, block=True, timeout=None):
"""
Get result with eventlet timeout support.
Uses eventlet.timeout.Timeout for timeout handling.
"""
def get_nowait(self):
"""Get result without blocking in green thread."""
def set(self, value):
"""Set result and wake waiting green threads."""
def set_exception(self, exception):
"""Set exception and notify green threads."""
class TimeoutError(Exception):
"""Timeout exception for eventlet handler operations."""Utility functions and base classes supporting all handler implementations with common socket operations and async result patterns.
class AsyncResult:
"""Base async result implementation."""
def __init__(self):
"""Initialize base async result."""
def ready(self):
"""Check if result is available."""
def successful(self):
"""Check if operation was successful."""
def get(self, block=True, timeout=None):
"""Get result value with optional timeout."""
def get_nowait(self):
"""Get result without blocking."""
def set(self, value):
"""Set the result value."""
def set_exception(self, exception):
"""Set an exception as result."""
def link(self, callback):
"""
Link callback to be called when result is ready.
Parameters:
- callback (callable): Function to call when ready
"""
def unlink(self, callback):
"""
Unlink previously linked callback.
Parameters:
- callback (callable): Callback to remove
"""
def create_socket_pair():
"""
Create connected socket pair for communication.
Returns:
tuple: (socket1, socket2) connected socket pair
"""
def create_tcp_socket(module):
"""
Create TCP socket using specified socket module.
Parameters:
- module: Socket module (socket, gevent.socket, etc.)
Returns:
socket: TCP socket object
"""
def create_tcp_connection(module, address, timeout=None):
"""
Create TCP connection to address.
Parameters:
- module: Socket module to use
- address (tuple): (host, port) tuple
- timeout (float): Connection timeout
Returns:
socket: Connected socket
"""
def capture_exceptions(async_object):
"""
Decorator to capture exceptions in async operations.
Parameters:
- async_object: Async result object
Returns:
Decorator function
"""
def wrap(async_object):
"""
Decorator to wrap function with async result.
Parameters:
- async_object: Async result object
Returns:
Decorator function
"""
def fileobj_to_fd(fileobj):
"""
Convert file object to file descriptor.
Parameters:
- fileobj: File-like object
Returns:
int: File descriptor
"""
def selector_select(selector, timeout):
"""
Select using selectors module.
Parameters:
- selector: Selector object
- timeout (float): Selection timeout
Returns:
list: Ready selectors
"""from kazoo.client import KazooClient
from kazoo.handlers.threading import SequentialThreadingHandler
import threading
import time
# Create client with threading handler (default)
handler = SequentialThreadingHandler()
zk = KazooClient(hosts='localhost:2181', handler=handler)
def connection_listener(state):
print(f"Connection state changed: {state}")
zk.add_listener(connection_listener)
try:
zk.start()
# Perform async operations
async_result = zk.create_async("/threading-test", b"test data", makepath=True)
# Wait for result with timeout
try:
path = async_result.get(timeout=5.0)
print(f"Created path: {path}")
except Exception as e:
print(f"Creation failed: {e}")
# Multiple async operations
get_result = zk.get_async("/threading-test")
exists_result = zk.exists_async("/threading-test")
# Wait for both results
data, stat = get_result.get()
exists_stat = exists_result.get()
print(f"Data: {data}, Exists: {exists_stat is not None}")
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.handlers.gevent import SequentialGeventHandler
import gevent
from gevent import spawn
# Create client with gevent handler
handler = SequentialGeventHandler()
zk = KazooClient(hosts='localhost:2181', handler=handler)
def worker(worker_id, path_base):
"""Worker greenlet function."""
try:
# Each worker creates its own path
path = f"{path_base}/worker-{worker_id}"
zk.create(path, f"worker {worker_id} data".encode(), makepath=True)
# Simulate some work
gevent.sleep(1)
# Update data
zk.set(path, f"worker {worker_id} updated".encode())
print(f"Worker {worker_id} completed")
except Exception as e:
print(f"Worker {worker_id} failed: {e}")
try:
zk.start()
# Create base path
zk.create("/gevent-test", b"base", makepath=True)
# Spawn multiple worker greenlets
greenlets = []
for i in range(5):
g = spawn(worker, i, "/gevent-test")
greenlets.append(g)
# Wait for all workers to complete
gevent.joinall(greenlets)
# List all worker paths
children = zk.get_children("/gevent-test")
print(f"Created paths: {children}")
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.handlers.eventlet import SequentialEventletHandler
import eventlet
from eventlet import spawn
# Create client with eventlet handler
handler = SequentialEventletHandler()
zk = KazooClient(hosts='localhost:2181', handler=handler)
def async_worker(path, data):
"""Async worker using eventlet."""
try:
# Create node
actual_path = zk.create(path, data.encode(), sequence=True, makepath=True)
# Simulate async work
eventlet.sleep(0.5)
# Read back data
read_data, stat = zk.get(actual_path)
print(f"Created {actual_path}: {read_data.decode()}")
return actual_path
except Exception as e:
print(f"Worker failed for {path}: {e}")
return None
try:
zk.start()
# Create multiple async workers
pool = eventlet.GreenPool(10)
results = []
for i in range(10):
future = pool.spawn(async_worker, "/eventlet-test/item-", f"data-{i}")
results.append(future)
# Wait for all to complete
paths = [future.wait() for future in results]
successful_paths = [p for p in paths if p is not None]
print(f"Successfully created {len(successful_paths)} paths")
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.handlers.threading import SequentialThreadingHandler
from kazoo.retry import KazooRetry
# Configure custom handler with retry policy
handler = SequentialThreadingHandler()
handler.start()
# Configure retry policies
connection_retry = KazooRetry(max_tries=3, delay=1, backoff=2)
command_retry = KazooRetry(max_tries=5, delay=0.1, backoff=1.5)
# Create client with custom configuration
zk = KazooClient(
hosts='zk1:2181,zk2:2181,zk3:2181',
handler=handler,
connection_retry=connection_retry,
command_retry=command_retry,
timeout=30.0
)
def state_listener(state):
print(f"State changed to: {state}")
zk.add_listener(state_listener)
try:
zk.start(timeout=15)
# Perform operations with custom handler
result = zk.create("/custom-handler-test", b"handler data", makepath=True)
print(f"Created with custom handler: {result}")
finally:
zk.stop()
handler.stop()from kazoo.client import KazooClient
from kazoo.handlers.threading import AsyncResult
import threading
import time
zk = KazooClient()
zk.start()
def result_callback(async_result):
"""Callback function for async result."""
try:
result = async_result.get_nowait()
print(f"Callback received result: {result}")
except Exception as e:
print(f"Callback received exception: {e}")
try:
# Create async operation
async_result = zk.create_async("/async-test", b"async data", makepath=True)
# Link callback to be called when ready
async_result.link(result_callback)
# Wait for result in another thread
def wait_for_result():
try:
path = async_result.get(timeout=10)
print(f"Background thread got result: {path}")
except Exception as e:
print(f"Background thread got exception: {e}")
thread = threading.Thread(target=wait_for_result)
thread.start()
# Main thread continues other work
time.sleep(1)
print("Main thread continuing...")
# Wait for background thread
thread.join()
# Check if result is ready
if async_result.ready():
print(f"Result is ready: {async_result.successful()}")
finally:
zk.stop()Install with Tessl CLI
npx tessl i tessl/pypi-kazoo