CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kazoo

Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.

Pending
Overview
Eval results
Files

testing.mddocs/

Testing Infrastructure

Testing utilities and harnesses for developing Zookeeper-based applications. Includes managed Zookeeper server instances, test clusters, and base test classes with automatic client setup and teardown for comprehensive testing scenarios.

Capabilities

Test Harness

Base test harness providing Zookeeper test environment setup and teardown with automatic server management and client configuration.

class KazooTestHarness:
    def __init__(self):
        """
        Base test harness for Kazoo tests.
        
        Provides methods for setting up and tearing down Zookeeper
        test environments with automatic server management.
        """
    
    def setup_zookeeper(self):
        """
        Set up Zookeeper test environment.
        
        Creates and starts managed Zookeeper server instances
        for testing. Configures cluster if multiple servers needed.
        """
    
    def teardown_zookeeper(self):
        """
        Tear down Zookeeper test environment.
        
        Stops and cleans up managed Zookeeper servers and
        removes temporary data directories.
        """
    
    @property
    def cluster(self):
        """
        Get the test Zookeeper cluster.
        
        Returns:
        ZookeeperCluster: Test cluster instance
        """
    
    @property
    def hosts(self):
        """
        Get connection string for test cluster.
        
        Returns:
        str: Comma-separated host:port list for test servers
        """

class KazooTestCase:
    def __init__(self):
        """
        Test case with automatic Kazoo client setup and teardown.
        
        Extends KazooTestHarness with pre-configured client
        instances and common test utilities.
        """
    
    def setUp(self):
        """
        Set up test case with Zookeeper and client.
        
        Creates test cluster, starts servers, and initializes
        connected KazooClient instance for testing.
        """
    
    def tearDown(self):
        """
        Clean up test case resources.
        
        Stops client connections, shuts down test servers,
        and cleans up temporary test data.
        """
    
    @property
    def client(self):
        """
        Get connected test client.
        
        Returns:
        KazooClient: Connected client instance for testing
        """
    
    def make_client(self, **kwargs):
        """
        Create additional test client.
        
        Parameters:
        - kwargs: KazooClient configuration parameters
        
        Returns:
        KazooClient: Configured client instance
        """

Managed Zookeeper Servers

Managed Zookeeper server instances for test environments with configurable server parameters and automatic lifecycle management.

class ManagedZooKeeper:
    def __init__(self, port=2181, data_dir=None, log_dir=None, 
                 java_system_properties=None, jvm_args=None):
        """
        Managed Zookeeper server instance for testing.
        
        Parameters:
        - port (int): Server port number
        - data_dir (str): Data directory path (temporary if None)
        - log_dir (str): Log directory path (temporary if None)
        - java_system_properties (dict): Java system properties
        - jvm_args (list): JVM arguments for server process
        """
    
    def start(self):
        """
        Start the Zookeeper server.
        
        Launches Zookeeper process with configured parameters
        and waits for server to become ready for connections.
        """
    
    def stop(self):
        """
        Stop the Zookeeper server.
        
        Gracefully shuts down server process and cleans up
        temporary directories if created automatically.
        """
    
    def restart(self):
        """
        Restart the Zookeeper server.
        
        Stops and starts the server, preserving data directory
        contents for testing persistence scenarios.
        """
    
    @property
    def running(self):
        """
        Check if server is running.
        
        Returns:
        bool: True if server process is active
        """
    
    @property
    def address(self):
        """
        Get server address.
        
        Returns:
        tuple: (host, port) tuple for connections
        """

class ZookeeperCluster:
    def __init__(self, size=3):
        """
        Cluster of Zookeeper servers for testing.
        
        Parameters:
        - size (int): Number of servers in cluster
        """
    
    def start(self):
        """
        Start all servers in the cluster.
        
        Starts servers sequentially and waits for cluster
        quorum to be established.
        """
    
    def stop(self):
        """
        Stop all servers in the cluster.
        
        Gracefully shuts down all server processes and
        cleans up cluster resources.
        """
    
    def terminate_server(self, server_id):
        """
        Terminate specific server in cluster.
        
        Parameters:
        - server_id (int): Server ID to terminate
        
        Used for testing cluster resilience and failover.
        """
    
    def restart_server(self, server_id):
        """
        Restart specific server in cluster.
        
        Parameters:
        - server_id (int): Server ID to restart
        """
    
    @property
    def servers(self):
        """
        Get list of cluster servers.
        
        Returns:
        list: List of ManagedZooKeeper instances
        """
    
    @property
    def hosts(self):
        """
        Get cluster connection string.
        
        Returns:
        str: Comma-separated host:port connection string
        """

Testing Utilities

Utility functions supporting Zookeeper testing scenarios with connection helpers, path utilities, and test data management.

def get_global_cluster():
    """
    Get reference to global test cluster.
    
    Returns:
    ZookeeperCluster: Shared cluster instance for tests
    
    Used for sharing cluster across multiple test cases
    to improve test performance and resource usage.
    """

def listen():
    """
    Listen for connections on available port.
    
    Returns:
    tuple: (socket, port) tuple with listening socket and port number
    
    Used for finding available ports for test servers.
    """

def to_java_compatible_path(path):
    """
    Convert path to Java-compatible format.
    
    Parameters:
    - path (str): File system path
    
    Returns:
    str: Java-compatible path string
    
    Handles platform-specific path separators and formats
    for Java-based Zookeeper server processes.
    """

Test Configuration

Configuration helpers and environment setup for different testing scenarios with customizable server parameters and test data.

# Test configuration constants and helpers

DEFAULT_TEST_PORT = 2181
"""Default port for test Zookeeper servers."""

DEFAULT_CLUSTER_SIZE = 3
"""Default cluster size for multi-server tests."""

TEST_DATA_DIR = "/tmp/kazoo-test"
"""Default base directory for test data."""

def create_test_client(hosts=None, **kwargs):
    """
    Create configured test client.
    
    Parameters:
    - hosts (str): Connection string (uses test cluster if None)
    - kwargs: Additional KazooClient parameters
    
    Returns:
    KazooClient: Configured test client
    """

def wait_for_server(host, port, timeout=30):
    """
    Wait for server to become available.
    
    Parameters:
    - host (str): Server hostname
    - port (int): Server port
    - timeout (float): Maximum wait time
    
    Returns:
    bool: True if server becomes available, False if timeout
    """

def cleanup_test_paths(client, base_path="/test"):
    """
    Clean up test paths from Zookeeper.
    
    Parameters:
    - client (KazooClient): Connected client
    - base_path (str): Base path to clean recursively
    
    Removes all test data from Zookeeper to ensure
    clean state between test runs.
    """

Usage Examples

Basic Test Case

import unittest
from kazoo.testing import KazooTestCase
from kazoo.exceptions import NoNodeError

class MyZookeeperTest(KazooTestCase):
    def setUp(self):
        super().setUp()
        # Additional test setup if needed
        self.test_path = "/test/myapp"
    
    def tearDown(self):
        # Clean up test data
        try:
            self.client.delete(self.test_path, recursive=True)
        except NoNodeError:
            pass
        super().tearDown()
    
    def test_create_and_read(self):
        """Test basic create and read operations."""
        # Create test node
        path = self.client.create(self.test_path, b"test data", makepath=True)
        self.assertEqual(path, self.test_path)
        
        # Read and verify
        data, stat = self.client.get(self.test_path)
        self.assertEqual(data, b"test data")
        self.assertGreater(stat.version, -1)
    
    def test_node_operations(self):
        """Test various node operations."""
        # Test creation
        self.client.create(self.test_path, b"initial", makepath=True)
        
        # Test update
        stat = self.client.set(self.test_path, b"updated")
        self.assertEqual(stat.version, 1)
        
        # Test children
        child_path = f"{self.test_path}/child"
        self.client.create(child_path, b"child data")
        
        children = self.client.get_children(self.test_path)
        self.assertIn("child", children)
        
        # Test existence
        stat = self.client.exists(child_path)
        self.assertIsNotNone(stat)

if __name__ == '__main__':
    unittest.main()

Multi-Client Testing

import unittest
from kazoo.testing import KazooTestHarness
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
import threading
import time

class MultiClientTest(KazooTestHarness):
    def setUp(self):
        self.setup_zookeeper()
        
        # Create multiple clients
        self.client1 = KazooClient(hosts=self.hosts)
        self.client2 = KazooClient(hosts=self.hosts)
        self.client3 = KazooClient(hosts=self.hosts)
        
        self.client1.start()
        self.client2.start() 
        self.client3.start()
    
    def tearDown(self):
        self.client1.stop()
        self.client2.stop()
        self.client3.stop()
        self.teardown_zookeeper()
    
    def test_distributed_lock(self):
        """Test distributed lock with multiple clients."""
        lock_path = "/test/lock"
        results = []
        
        def worker(client_id, client):
            lock = Lock(client, lock_path, f"client-{client_id}")
            
            if lock.acquire(timeout=5):
                try:
                    # Critical section
                    results.append(f"client-{client_id}-start")
                    time.sleep(0.1)  # Simulate work
                    results.append(f"client-{client_id}-end")
                finally:
                    lock.release()
            else:
                results.append(f"client-{client_id}-timeout")
        
        # Start workers concurrently
        threads = []
        for i, client in enumerate([self.client1, self.client2, self.client3]):
            thread = threading.Thread(target=worker, args=(i+1, client))
            threads.append(thread)
            thread.start()
        
        # Wait for completion
        for thread in threads:
            thread.join()
        
        # Verify exclusive access (no interleaved operations)
        self.assertEqual(len(results), 6)  # 3 clients * 2 events each
        
        # Check that start/end pairs are not interleaved
        for i in range(0, len(results), 2):
            start = results[i]
            end = results[i+1]
            client_id = start.split('-')[1]
            self.assertTrue(start.endswith('start'))
            self.assertTrue(end.endswith('end'))
            self.assertEqual(end.split('-')[1], client_id)

if __name__ == '__main__':
    unittest.main()

Cluster Failover Testing

import unittest
from kazoo.testing import KazooTestHarness
from kazoo.testing.common import ZookeeperCluster
from kazoo.client import KazooClient
from kazoo.protocol.states import KazooState
import time

class ClusterFailoverTest(KazooTestHarness):
    def setUp(self):
        # Create 3-node cluster for testing
        self.cluster = ZookeeperCluster(size=3)
        self.cluster.start()
        
        self.client = KazooClient(hosts=self.cluster.hosts)
        self.connection_states = []
        
        def state_listener(state):
            self.connection_states.append((time.time(), state))
        
        self.client.add_listener(state_listener)
        self.client.start()
    
    def tearDown(self):
        self.client.stop()
        self.cluster.stop()
    
    def test_server_failure_recovery(self):
        """Test client recovery from server failures."""
        # Ensure connected
        self.assertEqual(self.client.state, KazooState.CONNECTED)
        
        # Create test data
        test_path = "/test/failover"
        self.client.create(test_path, b"test data", makepath=True)
        
        # Kill one server (should not affect quorum)
        self.cluster.terminate_server(0)
        
        # Client should remain connected
        time.sleep(2)
        self.assertEqual(self.client.state, KazooState.CONNECTED)
        
        # Should still be able to operate
        data, stat = self.client.get(test_path)
        self.assertEqual(data, b"test data")
        
        # Kill second server (loses quorum)
        self.cluster.terminate_server(1)
        
        # Client should lose connection
        time.sleep(5)
        self.assertIn(KazooState.SUSPENDED, [state for _, state in self.connection_states])
        
        # Restart servers
        self.cluster.restart_server(0)
        self.cluster.restart_server(1)
        
        # Wait for reconnection
        timeout = time.time() + 15
        while time.time() < timeout and self.client.state != KazooState.CONNECTED:
            time.sleep(0.1)
        
        self.assertEqual(self.client.state, KazooState.CONNECTED)
        
        # Verify data survived
        data, stat = self.client.get(test_path)
        self.assertEqual(data, b"test data")

if __name__ == '__main__':
    unittest.main()

Custom Test Environment

import unittest
import tempfile
import shutil
from kazoo.testing.common import ManagedZooKeeper
from kazoo.client import KazooClient

class CustomEnvironmentTest(unittest.TestCase):
    def setUp(self):
        # Create custom test environment
        self.temp_dir = tempfile.mkdtemp()
        
        # Configure custom Zookeeper server
        self.zk_server = ManagedZooKeeper(
            port=12345,
            data_dir=f"{self.temp_dir}/data",
            log_dir=f"{self.temp_dir}/logs",
            java_system_properties={
                'zookeeper.admin.enableServer': 'false',
                'zookeeper.4lw.commands.whitelist': '*'
            }
        )
        
        self.zk_server.start()
        
        # Create client with custom configuration
        self.client = KazooClient(
            hosts='localhost:12345',
            timeout=5.0,
            connection_retry=None  # Disable retries for faster test failures
        )
        self.client.start()
    
    def tearDown(self):
        self.client.stop()
        self.zk_server.stop()
        shutil.rmtree(self.temp_dir)
    
    def test_custom_environment(self):
        """Test operations in custom environment."""
        # Verify we can connect to custom port
        self.assertTrue(self.client.connected)
        
        # Test basic operations
        path = self.client.create("/custom-test", b"custom data", makepath=True)
        self.assertEqual(path, "/custom-test")
        
        data, stat = self.client.get("/custom-test")
        self.assertEqual(data, b"custom data")
        
        # Test server commands (if enabled)
        try:
            response = self.client.command("ruok")
            self.assertEqual(response.strip(), "imok")
        except Exception:
            # Commands might be disabled in some configurations
            pass

if __name__ == '__main__':
    unittest.main()

Performance Testing

import unittest
import time
from kazoo.testing import KazooTestCase
from kazoo.client import KazooClient
import concurrent.futures

class PerformanceTest(KazooTestCase):
    def test_concurrent_operations(self):
        """Test performance with concurrent operations."""
        num_operations = 100
        base_path = "/test/performance"
        
        # Setup base path
        self.client.create(base_path, b"", makepath=True)
        
        def create_operation(i):
            path = f"{base_path}/item-{i:04d}"
            start_time = time.time()
            self.client.create(path, f"data-{i}".encode(), sequence=False)
            return time.time() - start_time
        
        # Run concurrent creates
        start_time = time.time()
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(create_operation, i) for i in range(num_operations)]
            operation_times = [future.result() for future in concurrent.futures.as_completed(futures)]
        
        total_time = time.time() - start_time
        
        # Verify all operations completed
        children = self.client.get_children(base_path)
        self.assertEqual(len(children), num_operations)
        
        # Performance metrics
        avg_operation_time = sum(operation_times) / len(operation_times)
        operations_per_second = num_operations / total_time
        
        print(f"Performance Results:")
        print(f"  Total operations: {num_operations}")
        print(f"  Total time: {total_time:.2f}s")
        print(f"  Operations/second: {operations_per_second:.1f}")
        print(f"  Average operation time: {avg_operation_time*1000:.1f}ms")
        
        # Basic performance assertions
        self.assertLess(avg_operation_time, 1.0)  # Operations should be fast
        self.assertGreater(operations_per_second, 10)  # Should handle reasonable load

if __name__ == '__main__':
    unittest.main()

Install with Tessl CLI

npx tessl i tessl/pypi-kazoo

docs

core-client.md

exceptions.md

handlers.md

index.md

recipes.md

security.md

testing.md

tile.json