CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kazoo

Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.

Pending
Overview
Eval results
Files

core-client.mddocs/

Core Client Operations

Primary Zookeeper client functionality providing connection management, CRUD operations, watches, transactions, and session handling. The KazooClient class serves as the main interface for all Zookeeper interactions and manages the underlying protocol communication.

Capabilities

Client Lifecycle

Connection management and session lifecycle operations for establishing, maintaining, and terminating Zookeeper connections with comprehensive configuration options.

class KazooClient:
    def __init__(self, hosts="127.0.0.1:2181", timeout=10.0, client_id=None, 
                 handler=None, default_acl=None, auth_data=None, sasl_options=None,
                 read_only=None, randomize_hosts=True, connection_retry=None, 
                 command_retry=None, logger=None, keyfile=None, keyfile_password=None,
                 certfile=None, ca=None, use_ssl=False, verify_certs=True):
        """
        Create a Kazoo client instance.
        
        Parameters:
        - hosts (str): Comma-separated list of Zookeeper servers (host:port)
        - timeout (float): Session timeout in seconds
        - client_id (tuple): Previous session (session_id, password) for reconnection
        - handler: Async handler implementation (threading/gevent/eventlet)
        - default_acl (list): Default ACL for created nodes
        - auth_data (list): Authentication data tuples (scheme, credential)
        - sasl_options (dict): SASL authentication options
        - read_only (bool): Allow read-only connections
        - randomize_hosts (bool): Randomize server connection order
        - connection_retry (KazooRetry): Connection retry policy
        - command_retry (KazooRetry): Command retry policy
        - logger: Custom logger instance
        - keyfile (str): SSL private key file path
        - keyfile_password (str): SSL private key password
        - certfile (str): SSL certificate file path
        - ca (str): SSL CA certificate file path
        - use_ssl (bool): Enable SSL connections
        - verify_certs (bool): Verify SSL certificates
        - **kwargs: Additional keyword arguments
        """
    
    def start(self, timeout=15):
        """
        Start the client and connect to Zookeeper.
        
        Parameters:
        - timeout (float): Connection timeout in seconds
        
        Raises:
        - KazooTimeoutError: If connection times out
        """
    
    def stop(self):
        """Stop the client and close the connection."""
    
    def restart(self):
        """Restart the client connection."""
    
    def close(self):
        """Close the client connection permanently."""
    
    @property
    def state(self):
        """Current connection state (KazooState)."""
    
    @property
    def connected(self):
        """True if client is connected."""
    
    @property
    def client_id(self):
        """Current session ID and password tuple."""

Node Operations

Core CRUD operations for Zookeeper nodes including creation, reading, updating, and deletion with support for ephemeral nodes, sequential nodes, and access control.

def create(self, path, value=b"", acl=None, ephemeral=False, sequence=False, makepath=False):
    """
    Create a Zookeeper node.
    
    Parameters:
    - path (str): Node path
    - value (bytes): Node data
    - acl (list): Access control list
    - ephemeral (bool): Create ephemeral node
    - sequence (bool): Append sequence number to path
    - makepath (bool): Create parent directories
    
    Returns:
    str: Actual path created (may include sequence number)
    
    Raises:
    - NodeExistsError: If node already exists
    - NoNodeError: If parent path doesn't exist and makepath=False
    """

def get(self, path, watch=None):
    """
    Get node data and metadata.
    
    Parameters:
    - path (str): Node path
    - watch (callable): Watch function for data changes
    
    Returns:
    tuple: (data, ZnodeStat) containing node data and metadata
    
    Raises:
    - NoNodeError: If node doesn't exist
    """

def set(self, path, value, version=-1):
    """
    Set node data.
    
    Parameters:
    - path (str): Node path
    - value (bytes): New node data
    - version (int): Expected version (-1 for any version)
    
    Returns:
    ZnodeStat: Updated node metadata
    
    Raises:
    - NoNodeError: If node doesn't exist
    - BadVersionError: If version doesn't match
    """

def delete(self, path, version=-1, recursive=False):
    """
    Delete a node.
    
    Parameters:
    - path (str): Node path
    - version (int): Expected version (-1 for any version)
    - recursive (bool): Delete children recursively
    
    Raises:
    - NoNodeError: If node doesn't exist
    - NotEmptyError: If node has children and recursive=False
    - BadVersionError: If version doesn't match
    """

def exists(self, path, watch=None):
    """
    Check if node exists.
    
    Parameters:
    - path (str): Node path
    - watch (callable): Watch function for existence changes
    
    Returns:
    ZnodeStat or None: Node metadata if exists, None otherwise
    """

def get_children(self, path, watch=None, include_data=False):
    """
    Get node children.
    
    Parameters:
    - path (str): Node path
    - watch (callable): Watch function for children changes
    - include_data (bool): Include child node data and stats
    
    Returns:
    list or tuple: Child names, or (children, data_list) if include_data=True
    
    Raises:
    - NoNodeError: If node doesn't exist
    """

Watches and Callbacks

Event watching mechanisms for monitoring node changes, children modifications, and existence state with automatic callback execution and watch re-registration support.

def add_listener(self, listener):
    """
    Add connection state listener.
    
    Parameters:
    - listener (callable): Function called on state changes with KazooState
    """

def remove_listener(self, listener):
    """Remove connection state listener."""

def get_async(self, path, watch=None):
    """
    Asynchronous get operation.
    
    Returns:
    IAsyncResult: Async result object
    """

def exists_async(self, path, watch=None):
    """
    Asynchronous exists operation.
    
    Returns:
    IAsyncResult: Async result object
    """

def get_children_async(self, path, watch=None, include_data=False):
    """
    Asynchronous get_children operation.
    
    Returns:
    IAsyncResult: Async result object
    """

Transactions

Atomic transaction support for performing multiple operations as a single unit with all-or-nothing semantics and version checking capabilities.

def transaction(self):
    """
    Create a new transaction request.
    
    Returns:
    TransactionRequest: Transaction builder object
    """

class TransactionRequest:
    def create(self, path, value=b"", acl=None, ephemeral=False, sequence=False):
        """Add create operation to transaction."""
    
    def delete(self, path, version=-1):
        """Add delete operation to transaction."""
    
    def set_data(self, path, value, version=-1):
        """Add set data operation to transaction."""
    
    def check(self, path, version):
        """Add version check operation to transaction."""
    
    def commit(self):
        """
        Commit the transaction.
        
        Returns:
        list: Results for each operation
        
        Raises:
        - RolledBackError: If any operation fails
        """

    def commit_async(self):
        """
        Asynchronous transaction commit.
        
        Returns:
        IAsyncResult: Async result object
        """

Authentication and Security

Authentication mechanisms and security operations for establishing secure sessions and managing access credentials with support for digest, SASL, and custom authentication schemes.

def add_auth(self, scheme, credential):
    """
    Add authentication credentials.
    
    Parameters:
    - scheme (str): Authentication scheme (digest, sasl, etc.)
    - credential (str): Authentication credential
    
    Raises:
    - AuthFailedError: If authentication fails
    """

def add_auth_async(self, scheme, credential):
    """
    Asynchronous authentication.
    
    Returns:
    IAsyncResult: Async result object
    """

def get_acls(self, path):
    """
    Get node ACLs.
    
    Parameters:
    - path (str): Node path
    
    Returns:
    tuple: (acl_list, ZnodeStat)
    
    Raises:
    - NoNodeError: If node doesn't exist
    """

def set_acls(self, path, acls, version=-1):
    """
    Set node ACLs.
    
    Parameters:
    - path (str): Node path
    - acls (list): List of ACL objects
    - version (int): Expected version
    
    Returns:
    ZnodeStat: Updated node metadata
    
    Raises:
    - NoNodeError: If node doesn't exist
    - BadVersionError: If version doesn't match
    """

Utility Operations

Additional client utilities for path operations, synchronization, and cluster management with support for server information and operational maintenance.

def sync(self, path):
    """
    Synchronize client view with leader.
    
    Parameters:
    - path (str): Path to synchronize
    
    Returns:
    str: Synchronized path
    """

def reconfig(self, joining=None, leaving=None, new_members=None, from_config=-1):
    """
    Reconfigure Zookeeper ensemble.
    
    Parameters:
    - joining (str): Servers joining ensemble
    - leaving (str): Servers leaving ensemble  
    - new_members (str): New ensemble configuration
    - from_config (int): Expected config version
    
    Returns:
    tuple: (data, ZnodeStat) of new configuration
    """

def server_version(self):
    """
    Get server version information.
    
    Returns:
    str: Server version string
    """

def command(self, cmd="ruok"):
    """
    Send administrative command to server.
    
    Parameters:
    - cmd (str): Command to send (ruok, stat, dump, etc.)
    
    Returns:
    str: Server response
    """

Usage Examples

Basic Connection and Operations

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NodeExistsError

# Create client with custom timeout
zk = KazooClient(hosts='localhost:2181,localhost:2182', timeout=30.0)

try:
    # Start connection
    zk.start(timeout=10)
    
    # Create node with data
    try:
        path = zk.create("/myapp/config", b'{"setting": "value"}', makepath=True)
        print(f"Created: {path}")
    except NodeExistsError:
        print("Node already exists")
    
    # Read data with watch
    def config_watcher(event):
        print(f"Config changed: {event}")
    
    data, stat = zk.get("/myapp/config", watch=config_watcher)
    print(f"Data: {data}, Version: {stat.version}")
    
    # Update data
    new_stat = zk.set("/myapp/config", b'{"setting": "new_value"}', version=stat.version)
    print(f"Updated to version: {new_stat.version}")
    
    # List children
    children = zk.get_children("/myapp")
    print(f"Children: {children}")
    
finally:
    zk.stop()

Transaction Example

from kazoo.client import KazooClient
from kazoo.exceptions import RolledBackError

zk = KazooClient()
zk.start()

try:
    # Create transaction
    transaction = zk.transaction()
    transaction.create("/app/counter", b"0")
    transaction.create("/app/status", b"active") 
    transaction.check("/app", version=0)  # Ensure parent unchanged
    
    # Commit atomically
    results = transaction.commit()
    print("Transaction successful:", results)
    
except RolledBackError as e:
    print("Transaction failed:", e)
finally:
    zk.stop()

Async Operations Example

from kazoo.client import KazooClient

zk = KazooClient()
zk.start()

try:
    # Async get operation
    async_result = zk.get_async("/some/path")
    
    # Wait for result with timeout
    try:
        data, stat = async_result.get(timeout=5.0)
        print(f"Got data: {data}")
    except Exception as e:
        print(f"Async operation failed: {e}")
        
finally:
    zk.stop()

Install with Tessl CLI

npx tessl i tessl/pypi-kazoo

docs

core-client.md

exceptions.md

handlers.md

index.md

recipes.md

security.md

testing.md

tile.json