0
# Testing Infrastructure
1
2
Testing utilities and harnesses for developing Zookeeper-based applications. Includes managed Zookeeper server instances, test clusters, and base test classes with automatic client setup and teardown for comprehensive testing scenarios.
3
4
## Capabilities
5
6
### Test Harness
7
8
Base test harness providing Zookeeper test environment setup and teardown with automatic server management and client configuration.
9
10
```python { .api }
11
class KazooTestHarness:
12
def __init__(self):
13
"""
14
Base test harness for Kazoo tests.
15
16
Provides methods for setting up and tearing down Zookeeper
17
test environments with automatic server management.
18
"""
19
20
def setup_zookeeper(self):
21
"""
22
Set up Zookeeper test environment.
23
24
Creates and starts managed Zookeeper server instances
25
for testing. Configures cluster if multiple servers needed.
26
"""
27
28
def teardown_zookeeper(self):
29
"""
30
Tear down Zookeeper test environment.
31
32
Stops and cleans up managed Zookeeper servers and
33
removes temporary data directories.
34
"""
35
36
@property
37
def cluster(self):
38
"""
39
Get the test Zookeeper cluster.
40
41
Returns:
42
ZookeeperCluster: Test cluster instance
43
"""
44
45
@property
46
def hosts(self):
47
"""
48
Get connection string for test cluster.
49
50
Returns:
51
str: Comma-separated host:port list for test servers
52
"""
53
54
class KazooTestCase:
55
def __init__(self):
56
"""
57
Test case with automatic Kazoo client setup and teardown.
58
59
Extends KazooTestHarness with pre-configured client
60
instances and common test utilities.
61
"""
62
63
def setUp(self):
64
"""
65
Set up test case with Zookeeper and client.
66
67
Creates test cluster, starts servers, and initializes
68
connected KazooClient instance for testing.
69
"""
70
71
def tearDown(self):
72
"""
73
Clean up test case resources.
74
75
Stops client connections, shuts down test servers,
76
and cleans up temporary test data.
77
"""
78
79
@property
80
def client(self):
81
"""
82
Get connected test client.
83
84
Returns:
85
KazooClient: Connected client instance for testing
86
"""
87
88
def make_client(self, **kwargs):
89
"""
90
Create additional test client.
91
92
Parameters:
93
- kwargs: KazooClient configuration parameters
94
95
Returns:
96
KazooClient: Configured client instance
97
"""
98
```
99
100
### Managed Zookeeper Servers
101
102
Managed Zookeeper server instances for test environments with configurable server parameters and automatic lifecycle management.
103
104
```python { .api }
105
class ManagedZooKeeper:
106
def __init__(self, port=2181, data_dir=None, log_dir=None,
107
java_system_properties=None, jvm_args=None):
108
"""
109
Managed Zookeeper server instance for testing.
110
111
Parameters:
112
- port (int): Server port number
113
- data_dir (str): Data directory path (temporary if None)
114
- log_dir (str): Log directory path (temporary if None)
115
- java_system_properties (dict): Java system properties
116
- jvm_args (list): JVM arguments for server process
117
"""
118
119
def start(self):
120
"""
121
Start the Zookeeper server.
122
123
Launches Zookeeper process with configured parameters
124
and waits for server to become ready for connections.
125
"""
126
127
def stop(self):
128
"""
129
Stop the Zookeeper server.
130
131
Gracefully shuts down server process and cleans up
132
temporary directories if created automatically.
133
"""
134
135
def restart(self):
136
"""
137
Restart the Zookeeper server.
138
139
Stops and starts the server, preserving data directory
140
contents for testing persistence scenarios.
141
"""
142
143
@property
144
def running(self):
145
"""
146
Check if server is running.
147
148
Returns:
149
bool: True if server process is active
150
"""
151
152
@property
153
def address(self):
154
"""
155
Get server address.
156
157
Returns:
158
tuple: (host, port) tuple for connections
159
"""
160
161
class ZookeeperCluster:
162
def __init__(self, size=3):
163
"""
164
Cluster of Zookeeper servers for testing.
165
166
Parameters:
167
- size (int): Number of servers in cluster
168
"""
169
170
def start(self):
171
"""
172
Start all servers in the cluster.
173
174
Starts servers sequentially and waits for cluster
175
quorum to be established.
176
"""
177
178
def stop(self):
179
"""
180
Stop all servers in the cluster.
181
182
Gracefully shuts down all server processes and
183
cleans up cluster resources.
184
"""
185
186
def terminate_server(self, server_id):
187
"""
188
Terminate specific server in cluster.
189
190
Parameters:
191
- server_id (int): Server ID to terminate
192
193
Used for testing cluster resilience and failover.
194
"""
195
196
def restart_server(self, server_id):
197
"""
198
Restart specific server in cluster.
199
200
Parameters:
201
- server_id (int): Server ID to restart
202
"""
203
204
@property
205
def servers(self):
206
"""
207
Get list of cluster servers.
208
209
Returns:
210
list: List of ManagedZooKeeper instances
211
"""
212
213
@property
214
def hosts(self):
215
"""
216
Get cluster connection string.
217
218
Returns:
219
str: Comma-separated host:port connection string
220
"""
221
```
222
223
### Testing Utilities
224
225
Utility functions supporting Zookeeper testing scenarios with connection helpers, path utilities, and test data management.
226
227
```python { .api }
228
def get_global_cluster():
229
"""
230
Get reference to global test cluster.
231
232
Returns:
233
ZookeeperCluster: Shared cluster instance for tests
234
235
Used for sharing cluster across multiple test cases
236
to improve test performance and resource usage.
237
"""
238
239
def listen():
240
"""
241
Listen for connections on available port.
242
243
Returns:
244
tuple: (socket, port) tuple with listening socket and port number
245
246
Used for finding available ports for test servers.
247
"""
248
249
def to_java_compatible_path(path):
250
"""
251
Convert path to Java-compatible format.
252
253
Parameters:
254
- path (str): File system path
255
256
Returns:
257
str: Java-compatible path string
258
259
Handles platform-specific path separators and formats
260
for Java-based Zookeeper server processes.
261
"""
262
```
263
264
### Test Configuration
265
266
Configuration helpers and environment setup for different testing scenarios with customizable server parameters and test data.
267
268
```python { .api }
269
# Test configuration constants and helpers
270
271
DEFAULT_TEST_PORT = 2181
272
"""Default port for test Zookeeper servers."""
273
274
DEFAULT_CLUSTER_SIZE = 3
275
"""Default cluster size for multi-server tests."""
276
277
TEST_DATA_DIR = "/tmp/kazoo-test"
278
"""Default base directory for test data."""
279
280
def create_test_client(hosts=None, **kwargs):
281
"""
282
Create configured test client.
283
284
Parameters:
285
- hosts (str): Connection string (uses test cluster if None)
286
- kwargs: Additional KazooClient parameters
287
288
Returns:
289
KazooClient: Configured test client
290
"""
291
292
def wait_for_server(host, port, timeout=30):
293
"""
294
Wait for server to become available.
295
296
Parameters:
297
- host (str): Server hostname
298
- port (int): Server port
299
- timeout (float): Maximum wait time
300
301
Returns:
302
bool: True if server becomes available, False if timeout
303
"""
304
305
def cleanup_test_paths(client, base_path="/test"):
306
"""
307
Clean up test paths from Zookeeper.
308
309
Parameters:
310
- client (KazooClient): Connected client
311
- base_path (str): Base path to clean recursively
312
313
Removes all test data from Zookeeper to ensure
314
clean state between test runs.
315
"""
316
```
317
318
## Usage Examples
319
320
### Basic Test Case
321
322
```python
323
import unittest
324
from kazoo.testing import KazooTestCase
325
from kazoo.exceptions import NoNodeError
326
327
class MyZookeeperTest(KazooTestCase):
328
def setUp(self):
329
super().setUp()
330
# Additional test setup if needed
331
self.test_path = "/test/myapp"
332
333
def tearDown(self):
334
# Clean up test data
335
try:
336
self.client.delete(self.test_path, recursive=True)
337
except NoNodeError:
338
pass
339
super().tearDown()
340
341
def test_create_and_read(self):
342
"""Test basic create and read operations."""
343
# Create test node
344
path = self.client.create(self.test_path, b"test data", makepath=True)
345
self.assertEqual(path, self.test_path)
346
347
# Read and verify
348
data, stat = self.client.get(self.test_path)
349
self.assertEqual(data, b"test data")
350
self.assertGreater(stat.version, -1)
351
352
def test_node_operations(self):
353
"""Test various node operations."""
354
# Test creation
355
self.client.create(self.test_path, b"initial", makepath=True)
356
357
# Test update
358
stat = self.client.set(self.test_path, b"updated")
359
self.assertEqual(stat.version, 1)
360
361
# Test children
362
child_path = f"{self.test_path}/child"
363
self.client.create(child_path, b"child data")
364
365
children = self.client.get_children(self.test_path)
366
self.assertIn("child", children)
367
368
# Test existence
369
stat = self.client.exists(child_path)
370
self.assertIsNotNone(stat)
371
372
if __name__ == '__main__':
373
unittest.main()
374
```
375
376
### Multi-Client Testing
377
378
```python
379
import unittest
380
from kazoo.testing import KazooTestHarness
381
from kazoo.client import KazooClient
382
from kazoo.recipe.lock import Lock
383
import threading
384
import time
385
386
class MultiClientTest(KazooTestHarness):
387
def setUp(self):
388
self.setup_zookeeper()
389
390
# Create multiple clients
391
self.client1 = KazooClient(hosts=self.hosts)
392
self.client2 = KazooClient(hosts=self.hosts)
393
self.client3 = KazooClient(hosts=self.hosts)
394
395
self.client1.start()
396
self.client2.start()
397
self.client3.start()
398
399
def tearDown(self):
400
self.client1.stop()
401
self.client2.stop()
402
self.client3.stop()
403
self.teardown_zookeeper()
404
405
def test_distributed_lock(self):
406
"""Test distributed lock with multiple clients."""
407
lock_path = "/test/lock"
408
results = []
409
410
def worker(client_id, client):
411
lock = Lock(client, lock_path, f"client-{client_id}")
412
413
if lock.acquire(timeout=5):
414
try:
415
# Critical section
416
results.append(f"client-{client_id}-start")
417
time.sleep(0.1) # Simulate work
418
results.append(f"client-{client_id}-end")
419
finally:
420
lock.release()
421
else:
422
results.append(f"client-{client_id}-timeout")
423
424
# Start workers concurrently
425
threads = []
426
for i, client in enumerate([self.client1, self.client2, self.client3]):
427
thread = threading.Thread(target=worker, args=(i+1, client))
428
threads.append(thread)
429
thread.start()
430
431
# Wait for completion
432
for thread in threads:
433
thread.join()
434
435
# Verify exclusive access (no interleaved operations)
436
self.assertEqual(len(results), 6) # 3 clients * 2 events each
437
438
# Check that start/end pairs are not interleaved
439
for i in range(0, len(results), 2):
440
start = results[i]
441
end = results[i+1]
442
client_id = start.split('-')[1]
443
self.assertTrue(start.endswith('start'))
444
self.assertTrue(end.endswith('end'))
445
self.assertEqual(end.split('-')[1], client_id)
446
447
if __name__ == '__main__':
448
unittest.main()
449
```
450
451
### Cluster Failover Testing
452
453
```python
454
import unittest
455
from kazoo.testing import KazooTestHarness
456
from kazoo.testing.common import ZookeeperCluster
457
from kazoo.client import KazooClient
458
from kazoo.protocol.states import KazooState
459
import time
460
461
class ClusterFailoverTest(KazooTestHarness):
462
def setUp(self):
463
# Create 3-node cluster for testing
464
self.cluster = ZookeeperCluster(size=3)
465
self.cluster.start()
466
467
self.client = KazooClient(hosts=self.cluster.hosts)
468
self.connection_states = []
469
470
def state_listener(state):
471
self.connection_states.append((time.time(), state))
472
473
self.client.add_listener(state_listener)
474
self.client.start()
475
476
def tearDown(self):
477
self.client.stop()
478
self.cluster.stop()
479
480
def test_server_failure_recovery(self):
481
"""Test client recovery from server failures."""
482
# Ensure connected
483
self.assertEqual(self.client.state, KazooState.CONNECTED)
484
485
# Create test data
486
test_path = "/test/failover"
487
self.client.create(test_path, b"test data", makepath=True)
488
489
# Kill one server (should not affect quorum)
490
self.cluster.terminate_server(0)
491
492
# Client should remain connected
493
time.sleep(2)
494
self.assertEqual(self.client.state, KazooState.CONNECTED)
495
496
# Should still be able to operate
497
data, stat = self.client.get(test_path)
498
self.assertEqual(data, b"test data")
499
500
# Kill second server (loses quorum)
501
self.cluster.terminate_server(1)
502
503
# Client should lose connection
504
time.sleep(5)
505
self.assertIn(KazooState.SUSPENDED, [state for _, state in self.connection_states])
506
507
# Restart servers
508
self.cluster.restart_server(0)
509
self.cluster.restart_server(1)
510
511
# Wait for reconnection
512
timeout = time.time() + 15
513
while time.time() < timeout and self.client.state != KazooState.CONNECTED:
514
time.sleep(0.1)
515
516
self.assertEqual(self.client.state, KazooState.CONNECTED)
517
518
# Verify data survived
519
data, stat = self.client.get(test_path)
520
self.assertEqual(data, b"test data")
521
522
if __name__ == '__main__':
523
unittest.main()
524
```
525
526
### Custom Test Environment
527
528
```python
529
import unittest
530
import tempfile
531
import shutil
532
from kazoo.testing.common import ManagedZooKeeper
533
from kazoo.client import KazooClient
534
535
class CustomEnvironmentTest(unittest.TestCase):
536
def setUp(self):
537
# Create custom test environment
538
self.temp_dir = tempfile.mkdtemp()
539
540
# Configure custom Zookeeper server
541
self.zk_server = ManagedZooKeeper(
542
port=12345,
543
data_dir=f"{self.temp_dir}/data",
544
log_dir=f"{self.temp_dir}/logs",
545
java_system_properties={
546
'zookeeper.admin.enableServer': 'false',
547
'zookeeper.4lw.commands.whitelist': '*'
548
}
549
)
550
551
self.zk_server.start()
552
553
# Create client with custom configuration
554
self.client = KazooClient(
555
hosts='localhost:12345',
556
timeout=5.0,
557
connection_retry=None # Disable retries for faster test failures
558
)
559
self.client.start()
560
561
def tearDown(self):
562
self.client.stop()
563
self.zk_server.stop()
564
shutil.rmtree(self.temp_dir)
565
566
def test_custom_environment(self):
567
"""Test operations in custom environment."""
568
# Verify we can connect to custom port
569
self.assertTrue(self.client.connected)
570
571
# Test basic operations
572
path = self.client.create("/custom-test", b"custom data", makepath=True)
573
self.assertEqual(path, "/custom-test")
574
575
data, stat = self.client.get("/custom-test")
576
self.assertEqual(data, b"custom data")
577
578
# Test server commands (if enabled)
579
try:
580
response = self.client.command("ruok")
581
self.assertEqual(response.strip(), "imok")
582
except Exception:
583
# Commands might be disabled in some configurations
584
pass
585
586
if __name__ == '__main__':
587
unittest.main()
588
```
589
590
### Performance Testing
591
592
```python
593
import unittest
594
import time
595
from kazoo.testing import KazooTestCase
596
from kazoo.client import KazooClient
597
import concurrent.futures
598
599
class PerformanceTest(KazooTestCase):
600
def test_concurrent_operations(self):
601
"""Test performance with concurrent operations."""
602
num_operations = 100
603
base_path = "/test/performance"
604
605
# Setup base path
606
self.client.create(base_path, b"", makepath=True)
607
608
def create_operation(i):
609
path = f"{base_path}/item-{i:04d}"
610
start_time = time.time()
611
self.client.create(path, f"data-{i}".encode(), sequence=False)
612
return time.time() - start_time
613
614
# Run concurrent creates
615
start_time = time.time()
616
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
617
futures = [executor.submit(create_operation, i) for i in range(num_operations)]
618
operation_times = [future.result() for future in concurrent.futures.as_completed(futures)]
619
620
total_time = time.time() - start_time
621
622
# Verify all operations completed
623
children = self.client.get_children(base_path)
624
self.assertEqual(len(children), num_operations)
625
626
# Performance metrics
627
avg_operation_time = sum(operation_times) / len(operation_times)
628
operations_per_second = num_operations / total_time
629
630
print(f"Performance Results:")
631
print(f" Total operations: {num_operations}")
632
print(f" Total time: {total_time:.2f}s")
633
print(f" Operations/second: {operations_per_second:.1f}")
634
print(f" Average operation time: {avg_operation_time*1000:.1f}ms")
635
636
# Basic performance assertions
637
self.assertLess(avg_operation_time, 1.0) # Operations should be fast
638
self.assertGreater(operations_per_second, 10) # Should handle reasonable load
639
640
if __name__ == '__main__':
641
unittest.main()
642
```