0
# Core Client Operations
1
2
Primary Zookeeper client functionality providing connection management, CRUD operations, watches, transactions, and session handling. The KazooClient class serves as the main interface for all Zookeeper interactions and manages the underlying protocol communication.
3
4
## Capabilities
5
6
### Client Lifecycle
7
8
Connection management and session lifecycle operations for establishing, maintaining, and terminating Zookeeper connections with comprehensive configuration options.
9
10
```python { .api }
11
class KazooClient:
12
def __init__(self, hosts="127.0.0.1:2181", timeout=10.0, client_id=None,
13
handler=None, default_acl=None, auth_data=None, sasl_options=None,
14
read_only=None, randomize_hosts=True, connection_retry=None,
15
command_retry=None, logger=None, keyfile=None, keyfile_password=None,
16
certfile=None, ca=None, use_ssl=False, verify_certs=True):
17
"""
18
Create a Kazoo client instance.
19
20
Parameters:
21
- hosts (str): Comma-separated list of Zookeeper servers (host:port)
22
- timeout (float): Session timeout in seconds
23
- client_id (tuple): Previous session (session_id, password) for reconnection
24
- handler: Async handler implementation (threading/gevent/eventlet)
25
- default_acl (list): Default ACL for created nodes
26
- auth_data (list): Authentication data tuples (scheme, credential)
27
- sasl_options (dict): SASL authentication options
28
- read_only (bool): Allow read-only connections
29
- randomize_hosts (bool): Randomize server connection order
30
- connection_retry (KazooRetry): Connection retry policy
31
- command_retry (KazooRetry): Command retry policy
32
- logger: Custom logger instance
33
- keyfile (str): SSL private key file path
34
- keyfile_password (str): SSL private key password
35
- certfile (str): SSL certificate file path
36
- ca (str): SSL CA certificate file path
37
- use_ssl (bool): Enable SSL connections
38
- verify_certs (bool): Verify SSL certificates
39
- **kwargs: Additional keyword arguments
40
"""
41
42
def start(self, timeout=15):
43
"""
44
Start the client and connect to Zookeeper.
45
46
Parameters:
47
- timeout (float): Connection timeout in seconds
48
49
Raises:
50
- KazooTimeoutError: If connection times out
51
"""
52
53
def stop(self):
54
"""Stop the client and close the connection."""
55
56
def restart(self):
57
"""Restart the client connection."""
58
59
def close(self):
60
"""Close the client connection permanently."""
61
62
@property
63
def state(self):
64
"""Current connection state (KazooState)."""
65
66
@property
67
def connected(self):
68
"""True if client is connected."""
69
70
@property
71
def client_id(self):
72
"""Current session ID and password tuple."""
73
```
74
75
### Node Operations
76
77
Core CRUD operations for Zookeeper nodes including creation, reading, updating, and deletion with support for ephemeral nodes, sequential nodes, and access control.
78
79
```python { .api }
80
def create(self, path, value=b"", acl=None, ephemeral=False, sequence=False, makepath=False):
81
"""
82
Create a Zookeeper node.
83
84
Parameters:
85
- path (str): Node path
86
- value (bytes): Node data
87
- acl (list): Access control list
88
- ephemeral (bool): Create ephemeral node
89
- sequence (bool): Append sequence number to path
90
- makepath (bool): Create parent directories
91
92
Returns:
93
str: Actual path created (may include sequence number)
94
95
Raises:
96
- NodeExistsError: If node already exists
97
- NoNodeError: If parent path doesn't exist and makepath=False
98
"""
99
100
def get(self, path, watch=None):
101
"""
102
Get node data and metadata.
103
104
Parameters:
105
- path (str): Node path
106
- watch (callable): Watch function for data changes
107
108
Returns:
109
tuple: (data, ZnodeStat) containing node data and metadata
110
111
Raises:
112
- NoNodeError: If node doesn't exist
113
"""
114
115
def set(self, path, value, version=-1):
116
"""
117
Set node data.
118
119
Parameters:
120
- path (str): Node path
121
- value (bytes): New node data
122
- version (int): Expected version (-1 for any version)
123
124
Returns:
125
ZnodeStat: Updated node metadata
126
127
Raises:
128
- NoNodeError: If node doesn't exist
129
- BadVersionError: If version doesn't match
130
"""
131
132
def delete(self, path, version=-1, recursive=False):
133
"""
134
Delete a node.
135
136
Parameters:
137
- path (str): Node path
138
- version (int): Expected version (-1 for any version)
139
- recursive (bool): Delete children recursively
140
141
Raises:
142
- NoNodeError: If node doesn't exist
143
- NotEmptyError: If node has children and recursive=False
144
- BadVersionError: If version doesn't match
145
"""
146
147
def exists(self, path, watch=None):
148
"""
149
Check if node exists.
150
151
Parameters:
152
- path (str): Node path
153
- watch (callable): Watch function for existence changes
154
155
Returns:
156
ZnodeStat or None: Node metadata if exists, None otherwise
157
"""
158
159
def get_children(self, path, watch=None, include_data=False):
160
"""
161
Get node children.
162
163
Parameters:
164
- path (str): Node path
165
- watch (callable): Watch function for children changes
166
- include_data (bool): Include child node data and stats
167
168
Returns:
169
list or tuple: Child names, or (children, data_list) if include_data=True
170
171
Raises:
172
- NoNodeError: If node doesn't exist
173
"""
174
```
175
176
### Watches and Callbacks
177
178
Event watching mechanisms for monitoring node changes, children modifications, and existence state with automatic callback execution and watch re-registration support.
179
180
```python { .api }
181
def add_listener(self, listener):
182
"""
183
Add connection state listener.
184
185
Parameters:
186
- listener (callable): Function called on state changes with KazooState
187
"""
188
189
def remove_listener(self, listener):
190
"""Remove connection state listener."""
191
192
def get_async(self, path, watch=None):
193
"""
194
Asynchronous get operation.
195
196
Returns:
197
IAsyncResult: Async result object
198
"""
199
200
def exists_async(self, path, watch=None):
201
"""
202
Asynchronous exists operation.
203
204
Returns:
205
IAsyncResult: Async result object
206
"""
207
208
def get_children_async(self, path, watch=None, include_data=False):
209
"""
210
Asynchronous get_children operation.
211
212
Returns:
213
IAsyncResult: Async result object
214
"""
215
```
216
217
### Transactions
218
219
Atomic transaction support for performing multiple operations as a single unit with all-or-nothing semantics and version checking capabilities.
220
221
```python { .api }
222
def transaction(self):
223
"""
224
Create a new transaction request.
225
226
Returns:
227
TransactionRequest: Transaction builder object
228
"""
229
230
class TransactionRequest:
231
def create(self, path, value=b"", acl=None, ephemeral=False, sequence=False):
232
"""Add create operation to transaction."""
233
234
def delete(self, path, version=-1):
235
"""Add delete operation to transaction."""
236
237
def set_data(self, path, value, version=-1):
238
"""Add set data operation to transaction."""
239
240
def check(self, path, version):
241
"""Add version check operation to transaction."""
242
243
def commit(self):
244
"""
245
Commit the transaction.
246
247
Returns:
248
list: Results for each operation
249
250
Raises:
251
- RolledBackError: If any operation fails
252
"""
253
254
def commit_async(self):
255
"""
256
Asynchronous transaction commit.
257
258
Returns:
259
IAsyncResult: Async result object
260
"""
261
```
262
263
### Authentication and Security
264
265
Authentication mechanisms and security operations for establishing secure sessions and managing access credentials with support for digest, SASL, and custom authentication schemes.
266
267
```python { .api }
268
def add_auth(self, scheme, credential):
269
"""
270
Add authentication credentials.
271
272
Parameters:
273
- scheme (str): Authentication scheme (digest, sasl, etc.)
274
- credential (str): Authentication credential
275
276
Raises:
277
- AuthFailedError: If authentication fails
278
"""
279
280
def add_auth_async(self, scheme, credential):
281
"""
282
Asynchronous authentication.
283
284
Returns:
285
IAsyncResult: Async result object
286
"""
287
288
def get_acls(self, path):
289
"""
290
Get node ACLs.
291
292
Parameters:
293
- path (str): Node path
294
295
Returns:
296
tuple: (acl_list, ZnodeStat)
297
298
Raises:
299
- NoNodeError: If node doesn't exist
300
"""
301
302
def set_acls(self, path, acls, version=-1):
303
"""
304
Set node ACLs.
305
306
Parameters:
307
- path (str): Node path
308
- acls (list): List of ACL objects
309
- version (int): Expected version
310
311
Returns:
312
ZnodeStat: Updated node metadata
313
314
Raises:
315
- NoNodeError: If node doesn't exist
316
- BadVersionError: If version doesn't match
317
"""
318
```
319
320
### Utility Operations
321
322
Additional client utilities for path operations, synchronization, and cluster management with support for server information and operational maintenance.
323
324
```python { .api }
325
def sync(self, path):
326
"""
327
Synchronize client view with leader.
328
329
Parameters:
330
- path (str): Path to synchronize
331
332
Returns:
333
str: Synchronized path
334
"""
335
336
def reconfig(self, joining=None, leaving=None, new_members=None, from_config=-1):
337
"""
338
Reconfigure Zookeeper ensemble.
339
340
Parameters:
341
- joining (str): Servers joining ensemble
342
- leaving (str): Servers leaving ensemble
343
- new_members (str): New ensemble configuration
344
- from_config (int): Expected config version
345
346
Returns:
347
tuple: (data, ZnodeStat) of new configuration
348
"""
349
350
def server_version(self):
351
"""
352
Get server version information.
353
354
Returns:
355
str: Server version string
356
"""
357
358
def command(self, cmd="ruok"):
359
"""
360
Send administrative command to server.
361
362
Parameters:
363
- cmd (str): Command to send (ruok, stat, dump, etc.)
364
365
Returns:
366
str: Server response
367
"""
368
```
369
370
## Usage Examples
371
372
### Basic Connection and Operations
373
374
```python
375
from kazoo.client import KazooClient
376
from kazoo.exceptions import NoNodeError, NodeExistsError
377
378
# Create client with custom timeout
379
zk = KazooClient(hosts='localhost:2181,localhost:2182', timeout=30.0)
380
381
try:
382
# Start connection
383
zk.start(timeout=10)
384
385
# Create node with data
386
try:
387
path = zk.create("/myapp/config", b'{"setting": "value"}', makepath=True)
388
print(f"Created: {path}")
389
except NodeExistsError:
390
print("Node already exists")
391
392
# Read data with watch
393
def config_watcher(event):
394
print(f"Config changed: {event}")
395
396
data, stat = zk.get("/myapp/config", watch=config_watcher)
397
print(f"Data: {data}, Version: {stat.version}")
398
399
# Update data
400
new_stat = zk.set("/myapp/config", b'{"setting": "new_value"}', version=stat.version)
401
print(f"Updated to version: {new_stat.version}")
402
403
# List children
404
children = zk.get_children("/myapp")
405
print(f"Children: {children}")
406
407
finally:
408
zk.stop()
409
```
410
411
### Transaction Example
412
413
```python
414
from kazoo.client import KazooClient
415
from kazoo.exceptions import RolledBackError
416
417
zk = KazooClient()
418
zk.start()
419
420
try:
421
# Create transaction
422
transaction = zk.transaction()
423
transaction.create("/app/counter", b"0")
424
transaction.create("/app/status", b"active")
425
transaction.check("/app", version=0) # Ensure parent unchanged
426
427
# Commit atomically
428
results = transaction.commit()
429
print("Transaction successful:", results)
430
431
except RolledBackError as e:
432
print("Transaction failed:", e)
433
finally:
434
zk.stop()
435
```
436
437
### Async Operations Example
438
439
```python
440
from kazoo.client import KazooClient
441
442
zk = KazooClient()
443
zk.start()
444
445
try:
446
# Async get operation
447
async_result = zk.get_async("/some/path")
448
449
# Wait for result with timeout
450
try:
451
data, stat = async_result.get(timeout=5.0)
452
print(f"Got data: {data}")
453
except Exception as e:
454
print(f"Async operation failed: {e}")
455
456
finally:
457
zk.stop()
458
```