Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.
—
Testing utilities and harnesses for developing Zookeeper-based applications. Includes managed Zookeeper server instances, test clusters, and base test classes with automatic client setup and teardown for comprehensive testing scenarios.
Base test harness providing Zookeeper test environment setup and teardown with automatic server management and client configuration.
class KazooTestHarness:
def __init__(self):
"""
Base test harness for Kazoo tests.
Provides methods for setting up and tearing down Zookeeper
test environments with automatic server management.
"""
def setup_zookeeper(self):
"""
Set up Zookeeper test environment.
Creates and starts managed Zookeeper server instances
for testing. Configures cluster if multiple servers needed.
"""
def teardown_zookeeper(self):
"""
Tear down Zookeeper test environment.
Stops and cleans up managed Zookeeper servers and
removes temporary data directories.
"""
@property
def cluster(self):
"""
Get the test Zookeeper cluster.
Returns:
ZookeeperCluster: Test cluster instance
"""
@property
def hosts(self):
"""
Get connection string for test cluster.
Returns:
str: Comma-separated host:port list for test servers
"""
class KazooTestCase:
def __init__(self):
"""
Test case with automatic Kazoo client setup and teardown.
Extends KazooTestHarness with pre-configured client
instances and common test utilities.
"""
def setUp(self):
"""
Set up test case with Zookeeper and client.
Creates test cluster, starts servers, and initializes
connected KazooClient instance for testing.
"""
def tearDown(self):
"""
Clean up test case resources.
Stops client connections, shuts down test servers,
and cleans up temporary test data.
"""
@property
def client(self):
"""
Get connected test client.
Returns:
KazooClient: Connected client instance for testing
"""
def make_client(self, **kwargs):
"""
Create additional test client.
Parameters:
- kwargs: KazooClient configuration parameters
Returns:
KazooClient: Configured client instance
"""Managed Zookeeper server instances for test environments with configurable server parameters and automatic lifecycle management.
class ManagedZooKeeper:
def __init__(self, port=2181, data_dir=None, log_dir=None,
java_system_properties=None, jvm_args=None):
"""
Managed Zookeeper server instance for testing.
Parameters:
- port (int): Server port number
- data_dir (str): Data directory path (temporary if None)
- log_dir (str): Log directory path (temporary if None)
- java_system_properties (dict): Java system properties
- jvm_args (list): JVM arguments for server process
"""
def start(self):
"""
Start the Zookeeper server.
Launches Zookeeper process with configured parameters
and waits for server to become ready for connections.
"""
def stop(self):
"""
Stop the Zookeeper server.
Gracefully shuts down server process and cleans up
temporary directories if created automatically.
"""
def restart(self):
"""
Restart the Zookeeper server.
Stops and starts the server, preserving data directory
contents for testing persistence scenarios.
"""
@property
def running(self):
"""
Check if server is running.
Returns:
bool: True if server process is active
"""
@property
def address(self):
"""
Get server address.
Returns:
tuple: (host, port) tuple for connections
"""
class ZookeeperCluster:
def __init__(self, size=3):
"""
Cluster of Zookeeper servers for testing.
Parameters:
- size (int): Number of servers in cluster
"""
def start(self):
"""
Start all servers in the cluster.
Starts servers sequentially and waits for cluster
quorum to be established.
"""
def stop(self):
"""
Stop all servers in the cluster.
Gracefully shuts down all server processes and
cleans up cluster resources.
"""
def terminate_server(self, server_id):
"""
Terminate specific server in cluster.
Parameters:
- server_id (int): Server ID to terminate
Used for testing cluster resilience and failover.
"""
def restart_server(self, server_id):
"""
Restart specific server in cluster.
Parameters:
- server_id (int): Server ID to restart
"""
@property
def servers(self):
"""
Get list of cluster servers.
Returns:
list: List of ManagedZooKeeper instances
"""
@property
def hosts(self):
"""
Get cluster connection string.
Returns:
str: Comma-separated host:port connection string
"""Utility functions supporting Zookeeper testing scenarios with connection helpers, path utilities, and test data management.
def get_global_cluster():
"""
Get reference to global test cluster.
Returns:
ZookeeperCluster: Shared cluster instance for tests
Used for sharing cluster across multiple test cases
to improve test performance and resource usage.
"""
def listen():
"""
Listen for connections on available port.
Returns:
tuple: (socket, port) tuple with listening socket and port number
Used for finding available ports for test servers.
"""
def to_java_compatible_path(path):
"""
Convert path to Java-compatible format.
Parameters:
- path (str): File system path
Returns:
str: Java-compatible path string
Handles platform-specific path separators and formats
for Java-based Zookeeper server processes.
"""Configuration helpers and environment setup for different testing scenarios with customizable server parameters and test data.
# Test configuration constants and helpers
DEFAULT_TEST_PORT = 2181
"""Default port for test Zookeeper servers."""
DEFAULT_CLUSTER_SIZE = 3
"""Default cluster size for multi-server tests."""
TEST_DATA_DIR = "/tmp/kazoo-test"
"""Default base directory for test data."""
def create_test_client(hosts=None, **kwargs):
"""
Create configured test client.
Parameters:
- hosts (str): Connection string (uses test cluster if None)
- kwargs: Additional KazooClient parameters
Returns:
KazooClient: Configured test client
"""
def wait_for_server(host, port, timeout=30):
"""
Wait for server to become available.
Parameters:
- host (str): Server hostname
- port (int): Server port
- timeout (float): Maximum wait time
Returns:
bool: True if server becomes available, False if timeout
"""
def cleanup_test_paths(client, base_path="/test"):
"""
Clean up test paths from Zookeeper.
Parameters:
- client (KazooClient): Connected client
- base_path (str): Base path to clean recursively
Removes all test data from Zookeeper to ensure
clean state between test runs.
"""import unittest
from kazoo.testing import KazooTestCase
from kazoo.exceptions import NoNodeError
class MyZookeeperTest(KazooTestCase):
def setUp(self):
super().setUp()
# Additional test setup if needed
self.test_path = "/test/myapp"
def tearDown(self):
# Clean up test data
try:
self.client.delete(self.test_path, recursive=True)
except NoNodeError:
pass
super().tearDown()
def test_create_and_read(self):
"""Test basic create and read operations."""
# Create test node
path = self.client.create(self.test_path, b"test data", makepath=True)
self.assertEqual(path, self.test_path)
# Read and verify
data, stat = self.client.get(self.test_path)
self.assertEqual(data, b"test data")
self.assertGreater(stat.version, -1)
def test_node_operations(self):
"""Test various node operations."""
# Test creation
self.client.create(self.test_path, b"initial", makepath=True)
# Test update
stat = self.client.set(self.test_path, b"updated")
self.assertEqual(stat.version, 1)
# Test children
child_path = f"{self.test_path}/child"
self.client.create(child_path, b"child data")
children = self.client.get_children(self.test_path)
self.assertIn("child", children)
# Test existence
stat = self.client.exists(child_path)
self.assertIsNotNone(stat)
if __name__ == '__main__':
unittest.main()import unittest
from kazoo.testing import KazooTestHarness
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
import threading
import time
class MultiClientTest(KazooTestHarness):
def setUp(self):
self.setup_zookeeper()
# Create multiple clients
self.client1 = KazooClient(hosts=self.hosts)
self.client2 = KazooClient(hosts=self.hosts)
self.client3 = KazooClient(hosts=self.hosts)
self.client1.start()
self.client2.start()
self.client3.start()
def tearDown(self):
self.client1.stop()
self.client2.stop()
self.client3.stop()
self.teardown_zookeeper()
def test_distributed_lock(self):
"""Test distributed lock with multiple clients."""
lock_path = "/test/lock"
results = []
def worker(client_id, client):
lock = Lock(client, lock_path, f"client-{client_id}")
if lock.acquire(timeout=5):
try:
# Critical section
results.append(f"client-{client_id}-start")
time.sleep(0.1) # Simulate work
results.append(f"client-{client_id}-end")
finally:
lock.release()
else:
results.append(f"client-{client_id}-timeout")
# Start workers concurrently
threads = []
for i, client in enumerate([self.client1, self.client2, self.client3]):
thread = threading.Thread(target=worker, args=(i+1, client))
threads.append(thread)
thread.start()
# Wait for completion
for thread in threads:
thread.join()
# Verify exclusive access (no interleaved operations)
self.assertEqual(len(results), 6) # 3 clients * 2 events each
# Check that start/end pairs are not interleaved
for i in range(0, len(results), 2):
start = results[i]
end = results[i+1]
client_id = start.split('-')[1]
self.assertTrue(start.endswith('start'))
self.assertTrue(end.endswith('end'))
self.assertEqual(end.split('-')[1], client_id)
if __name__ == '__main__':
unittest.main()import unittest
from kazoo.testing import KazooTestHarness
from kazoo.testing.common import ZookeeperCluster
from kazoo.client import KazooClient
from kazoo.protocol.states import KazooState
import time
class ClusterFailoverTest(KazooTestHarness):
def setUp(self):
# Create 3-node cluster for testing
self.cluster = ZookeeperCluster(size=3)
self.cluster.start()
self.client = KazooClient(hosts=self.cluster.hosts)
self.connection_states = []
def state_listener(state):
self.connection_states.append((time.time(), state))
self.client.add_listener(state_listener)
self.client.start()
def tearDown(self):
self.client.stop()
self.cluster.stop()
def test_server_failure_recovery(self):
"""Test client recovery from server failures."""
# Ensure connected
self.assertEqual(self.client.state, KazooState.CONNECTED)
# Create test data
test_path = "/test/failover"
self.client.create(test_path, b"test data", makepath=True)
# Kill one server (should not affect quorum)
self.cluster.terminate_server(0)
# Client should remain connected
time.sleep(2)
self.assertEqual(self.client.state, KazooState.CONNECTED)
# Should still be able to operate
data, stat = self.client.get(test_path)
self.assertEqual(data, b"test data")
# Kill second server (loses quorum)
self.cluster.terminate_server(1)
# Client should lose connection
time.sleep(5)
self.assertIn(KazooState.SUSPENDED, [state for _, state in self.connection_states])
# Restart servers
self.cluster.restart_server(0)
self.cluster.restart_server(1)
# Wait for reconnection
timeout = time.time() + 15
while time.time() < timeout and self.client.state != KazooState.CONNECTED:
time.sleep(0.1)
self.assertEqual(self.client.state, KazooState.CONNECTED)
# Verify data survived
data, stat = self.client.get(test_path)
self.assertEqual(data, b"test data")
if __name__ == '__main__':
unittest.main()import unittest
import tempfile
import shutil
from kazoo.testing.common import ManagedZooKeeper
from kazoo.client import KazooClient
class CustomEnvironmentTest(unittest.TestCase):
def setUp(self):
# Create custom test environment
self.temp_dir = tempfile.mkdtemp()
# Configure custom Zookeeper server
self.zk_server = ManagedZooKeeper(
port=12345,
data_dir=f"{self.temp_dir}/data",
log_dir=f"{self.temp_dir}/logs",
java_system_properties={
'zookeeper.admin.enableServer': 'false',
'zookeeper.4lw.commands.whitelist': '*'
}
)
self.zk_server.start()
# Create client with custom configuration
self.client = KazooClient(
hosts='localhost:12345',
timeout=5.0,
connection_retry=None # Disable retries for faster test failures
)
self.client.start()
def tearDown(self):
self.client.stop()
self.zk_server.stop()
shutil.rmtree(self.temp_dir)
def test_custom_environment(self):
"""Test operations in custom environment."""
# Verify we can connect to custom port
self.assertTrue(self.client.connected)
# Test basic operations
path = self.client.create("/custom-test", b"custom data", makepath=True)
self.assertEqual(path, "/custom-test")
data, stat = self.client.get("/custom-test")
self.assertEqual(data, b"custom data")
# Test server commands (if enabled)
try:
response = self.client.command("ruok")
self.assertEqual(response.strip(), "imok")
except Exception:
# Commands might be disabled in some configurations
pass
if __name__ == '__main__':
unittest.main()import unittest
import time
from kazoo.testing import KazooTestCase
from kazoo.client import KazooClient
import concurrent.futures
class PerformanceTest(KazooTestCase):
def test_concurrent_operations(self):
"""Test performance with concurrent operations."""
num_operations = 100
base_path = "/test/performance"
# Setup base path
self.client.create(base_path, b"", makepath=True)
def create_operation(i):
path = f"{base_path}/item-{i:04d}"
start_time = time.time()
self.client.create(path, f"data-{i}".encode(), sequence=False)
return time.time() - start_time
# Run concurrent creates
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(create_operation, i) for i in range(num_operations)]
operation_times = [future.result() for future in concurrent.futures.as_completed(futures)]
total_time = time.time() - start_time
# Verify all operations completed
children = self.client.get_children(base_path)
self.assertEqual(len(children), num_operations)
# Performance metrics
avg_operation_time = sum(operation_times) / len(operation_times)
operations_per_second = num_operations / total_time
print(f"Performance Results:")
print(f" Total operations: {num_operations}")
print(f" Total time: {total_time:.2f}s")
print(f" Operations/second: {operations_per_second:.1f}")
print(f" Average operation time: {avg_operation_time*1000:.1f}ms")
# Basic performance assertions
self.assertLess(avg_operation_time, 1.0) # Operations should be fast
self.assertGreater(operations_per_second, 10) # Should handle reasonable load
if __name__ == '__main__':
unittest.main()Install with Tessl CLI
npx tessl i tessl/pypi-kazoo