Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.
—
Primary Zookeeper client functionality providing connection management, CRUD operations, watches, transactions, and session handling. The KazooClient class serves as the main interface for all Zookeeper interactions and manages the underlying protocol communication.
Connection management and session lifecycle operations for establishing, maintaining, and terminating Zookeeper connections with comprehensive configuration options.
class KazooClient:
def __init__(self, hosts="127.0.0.1:2181", timeout=10.0, client_id=None,
handler=None, default_acl=None, auth_data=None, sasl_options=None,
read_only=None, randomize_hosts=True, connection_retry=None,
command_retry=None, logger=None, keyfile=None, keyfile_password=None,
certfile=None, ca=None, use_ssl=False, verify_certs=True):
"""
Create a Kazoo client instance.
Parameters:
- hosts (str): Comma-separated list of Zookeeper servers (host:port)
- timeout (float): Session timeout in seconds
- client_id (tuple): Previous session (session_id, password) for reconnection
- handler: Async handler implementation (threading/gevent/eventlet)
- default_acl (list): Default ACL for created nodes
- auth_data (list): Authentication data tuples (scheme, credential)
- sasl_options (dict): SASL authentication options
- read_only (bool): Allow read-only connections
- randomize_hosts (bool): Randomize server connection order
- connection_retry (KazooRetry): Connection retry policy
- command_retry (KazooRetry): Command retry policy
- logger: Custom logger instance
- keyfile (str): SSL private key file path
- keyfile_password (str): SSL private key password
- certfile (str): SSL certificate file path
- ca (str): SSL CA certificate file path
- use_ssl (bool): Enable SSL connections
- verify_certs (bool): Verify SSL certificates
- **kwargs: Additional keyword arguments
"""
def start(self, timeout=15):
"""
Start the client and connect to Zookeeper.
Parameters:
- timeout (float): Connection timeout in seconds
Raises:
- KazooTimeoutError: If connection times out
"""
def stop(self):
"""Stop the client and close the connection."""
def restart(self):
"""Restart the client connection."""
def close(self):
"""Close the client connection permanently."""
@property
def state(self):
"""Current connection state (KazooState)."""
@property
def connected(self):
"""True if client is connected."""
@property
def client_id(self):
"""Current session ID and password tuple."""Core CRUD operations for Zookeeper nodes including creation, reading, updating, and deletion with support for ephemeral nodes, sequential nodes, and access control.
def create(self, path, value=b"", acl=None, ephemeral=False, sequence=False, makepath=False):
"""
Create a Zookeeper node.
Parameters:
- path (str): Node path
- value (bytes): Node data
- acl (list): Access control list
- ephemeral (bool): Create ephemeral node
- sequence (bool): Append sequence number to path
- makepath (bool): Create parent directories
Returns:
str: Actual path created (may include sequence number)
Raises:
- NodeExistsError: If node already exists
- NoNodeError: If parent path doesn't exist and makepath=False
"""
def get(self, path, watch=None):
"""
Get node data and metadata.
Parameters:
- path (str): Node path
- watch (callable): Watch function for data changes
Returns:
tuple: (data, ZnodeStat) containing node data and metadata
Raises:
- NoNodeError: If node doesn't exist
"""
def set(self, path, value, version=-1):
"""
Set node data.
Parameters:
- path (str): Node path
- value (bytes): New node data
- version (int): Expected version (-1 for any version)
Returns:
ZnodeStat: Updated node metadata
Raises:
- NoNodeError: If node doesn't exist
- BadVersionError: If version doesn't match
"""
def delete(self, path, version=-1, recursive=False):
"""
Delete a node.
Parameters:
- path (str): Node path
- version (int): Expected version (-1 for any version)
- recursive (bool): Delete children recursively
Raises:
- NoNodeError: If node doesn't exist
- NotEmptyError: If node has children and recursive=False
- BadVersionError: If version doesn't match
"""
def exists(self, path, watch=None):
"""
Check if node exists.
Parameters:
- path (str): Node path
- watch (callable): Watch function for existence changes
Returns:
ZnodeStat or None: Node metadata if exists, None otherwise
"""
def get_children(self, path, watch=None, include_data=False):
"""
Get node children.
Parameters:
- path (str): Node path
- watch (callable): Watch function for children changes
- include_data (bool): Include child node data and stats
Returns:
list or tuple: Child names, or (children, data_list) if include_data=True
Raises:
- NoNodeError: If node doesn't exist
"""Event watching mechanisms for monitoring node changes, children modifications, and existence state with automatic callback execution and watch re-registration support.
def add_listener(self, listener):
"""
Add connection state listener.
Parameters:
- listener (callable): Function called on state changes with KazooState
"""
def remove_listener(self, listener):
"""Remove connection state listener."""
def get_async(self, path, watch=None):
"""
Asynchronous get operation.
Returns:
IAsyncResult: Async result object
"""
def exists_async(self, path, watch=None):
"""
Asynchronous exists operation.
Returns:
IAsyncResult: Async result object
"""
def get_children_async(self, path, watch=None, include_data=False):
"""
Asynchronous get_children operation.
Returns:
IAsyncResult: Async result object
"""Atomic transaction support for performing multiple operations as a single unit with all-or-nothing semantics and version checking capabilities.
def transaction(self):
"""
Create a new transaction request.
Returns:
TransactionRequest: Transaction builder object
"""
class TransactionRequest:
def create(self, path, value=b"", acl=None, ephemeral=False, sequence=False):
"""Add create operation to transaction."""
def delete(self, path, version=-1):
"""Add delete operation to transaction."""
def set_data(self, path, value, version=-1):
"""Add set data operation to transaction."""
def check(self, path, version):
"""Add version check operation to transaction."""
def commit(self):
"""
Commit the transaction.
Returns:
list: Results for each operation
Raises:
- RolledBackError: If any operation fails
"""
def commit_async(self):
"""
Asynchronous transaction commit.
Returns:
IAsyncResult: Async result object
"""Authentication mechanisms and security operations for establishing secure sessions and managing access credentials with support for digest, SASL, and custom authentication schemes.
def add_auth(self, scheme, credential):
"""
Add authentication credentials.
Parameters:
- scheme (str): Authentication scheme (digest, sasl, etc.)
- credential (str): Authentication credential
Raises:
- AuthFailedError: If authentication fails
"""
def add_auth_async(self, scheme, credential):
"""
Asynchronous authentication.
Returns:
IAsyncResult: Async result object
"""
def get_acls(self, path):
"""
Get node ACLs.
Parameters:
- path (str): Node path
Returns:
tuple: (acl_list, ZnodeStat)
Raises:
- NoNodeError: If node doesn't exist
"""
def set_acls(self, path, acls, version=-1):
"""
Set node ACLs.
Parameters:
- path (str): Node path
- acls (list): List of ACL objects
- version (int): Expected version
Returns:
ZnodeStat: Updated node metadata
Raises:
- NoNodeError: If node doesn't exist
- BadVersionError: If version doesn't match
"""Additional client utilities for path operations, synchronization, and cluster management with support for server information and operational maintenance.
def sync(self, path):
"""
Synchronize client view with leader.
Parameters:
- path (str): Path to synchronize
Returns:
str: Synchronized path
"""
def reconfig(self, joining=None, leaving=None, new_members=None, from_config=-1):
"""
Reconfigure Zookeeper ensemble.
Parameters:
- joining (str): Servers joining ensemble
- leaving (str): Servers leaving ensemble
- new_members (str): New ensemble configuration
- from_config (int): Expected config version
Returns:
tuple: (data, ZnodeStat) of new configuration
"""
def server_version(self):
"""
Get server version information.
Returns:
str: Server version string
"""
def command(self, cmd="ruok"):
"""
Send administrative command to server.
Parameters:
- cmd (str): Command to send (ruok, stat, dump, etc.)
Returns:
str: Server response
"""from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NodeExistsError
# Create client with custom timeout
zk = KazooClient(hosts='localhost:2181,localhost:2182', timeout=30.0)
try:
# Start connection
zk.start(timeout=10)
# Create node with data
try:
path = zk.create("/myapp/config", b'{"setting": "value"}', makepath=True)
print(f"Created: {path}")
except NodeExistsError:
print("Node already exists")
# Read data with watch
def config_watcher(event):
print(f"Config changed: {event}")
data, stat = zk.get("/myapp/config", watch=config_watcher)
print(f"Data: {data}, Version: {stat.version}")
# Update data
new_stat = zk.set("/myapp/config", b'{"setting": "new_value"}', version=stat.version)
print(f"Updated to version: {new_stat.version}")
# List children
children = zk.get_children("/myapp")
print(f"Children: {children}")
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.exceptions import RolledBackError
zk = KazooClient()
zk.start()
try:
# Create transaction
transaction = zk.transaction()
transaction.create("/app/counter", b"0")
transaction.create("/app/status", b"active")
transaction.check("/app", version=0) # Ensure parent unchanged
# Commit atomically
results = transaction.commit()
print("Transaction successful:", results)
except RolledBackError as e:
print("Transaction failed:", e)
finally:
zk.stop()from kazoo.client import KazooClient
zk = KazooClient()
zk.start()
try:
# Async get operation
async_result = zk.get_async("/some/path")
# Wait for result with timeout
try:
data, stat = async_result.get(timeout=5.0)
print(f"Got data: {data}")
except Exception as e:
print(f"Async operation failed: {e}")
finally:
zk.stop()Install with Tessl CLI
npx tessl i tessl/pypi-kazoo