0
# Event and State Management
1
2
Comprehensive event handling and connection state management with listeners for background operations, connection state changes, and unhandled errors, providing robust monitoring and error handling capabilities.
3
4
## Capabilities
5
6
### Event Handling
7
8
Core event interfaces for handling ZooKeeper events and background operation results.
9
10
```java { .api }
11
/**
12
* Returns the listenable interface for events
13
* @return listenable
14
*/
15
Listenable<CuratorListener> getCuratorListenable();
16
17
/**
18
* Returns the listenable interface for unhandled errors
19
* @return listenable
20
*/
21
Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
22
23
/**
24
* Represents ZooKeeper events and background operation results
25
*/
26
public interface CuratorEvent {
27
/**
28
* Get the event type
29
* @return event type
30
*/
31
CuratorEventType getType();
32
33
/**
34
* Get the ZooKeeper result code
35
* @return result code (0 = success)
36
*/
37
int getResultCode();
38
39
/**
40
* Get the path associated with this event
41
* @return path or null
42
*/
43
String getPath();
44
45
/**
46
* Get the background context object
47
* @return context or null
48
*/
49
Object getContext();
50
51
/**
52
* Get stat information
53
* @return stat or null
54
*/
55
Stat getStat();
56
57
/**
58
* Get data associated with the event
59
* @return data bytes or null
60
*/
61
byte[] getData();
62
63
/**
64
* Get the name (for create operations with sequential modes)
65
* @return name or null
66
*/
67
String getName();
68
69
/**
70
* Get children list (for getChildren operations)
71
* @return children list or null
72
*/
73
List<String> getChildren();
74
75
/**
76
* Get ACL list (for getACL/setACL operations)
77
* @return ACL list or null
78
*/
79
List<ACL> getACLList();
80
81
/**
82
* Get transaction results (for transaction operations)
83
* @return transaction results or null
84
*/
85
List<CuratorTransactionResult> getOpResults();
86
87
/**
88
* Get the watched event (for watcher callbacks)
89
* @return watched event or null
90
*/
91
WatchedEvent getWatchedEvent();
92
}
93
94
/**
95
* Types of curator events
96
*/
97
public enum CuratorEventType {
98
CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN,
99
SYNC, GET_ACL, SET_ACL, TRANSACTION, GET_CONFIG, RECONFIG,
100
WATCHED, REMOVE_WATCHES, ADD_WATCH, CLOSING
101
}
102
103
/**
104
* Listener for background events and errors
105
*/
106
public interface CuratorListener {
107
/**
108
* Called when a background event occurs
109
* @param client the client
110
* @param event the event
111
* @throws Exception errors
112
*/
113
void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
114
}
115
116
/**
117
* Listener for unhandled errors
118
*/
119
public interface UnhandledErrorListener {
120
/**
121
* Called when an unhandled error occurs
122
* @param message error message
123
* @param e the exception
124
*/
125
void unhandledError(String message, Throwable e);
126
}
127
128
/**
129
* Callback for background operations
130
*/
131
public interface BackgroundCallback {
132
/**
133
* Called when a background operation completes
134
* @param client the client
135
* @param event the event
136
* @throws Exception errors
137
*/
138
void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
139
}
140
```
141
142
**Usage Examples:**
143
144
```java
145
// Add general curator listener
146
client.getCuratorListenable().addListener(new CuratorListener() {
147
@Override
148
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
149
System.out.println("Event: " + event.getType() + " Path: " + event.getPath());
150
151
// Handle different event types
152
switch (event.getType()) {
153
case CREATE:
154
System.out.println("Node created: " + event.getPath());
155
break;
156
case DELETE:
157
System.out.println("Node deleted: " + event.getPath());
158
break;
159
case GET_DATA:
160
byte[] data = event.getData();
161
System.out.println("Got data: " + new String(data));
162
break;
163
case CHILDREN:
164
List<String> children = event.getChildren();
165
System.out.println("Children: " + children);
166
break;
167
case TRANSACTION:
168
List<CuratorTransactionResult> results = event.getOpResults();
169
System.out.println("Transaction completed with " + results.size() + " operations");
170
break;
171
}
172
}
173
});
174
175
// Add unhandled error listener
176
client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
177
@Override
178
public void unhandledError(String message, Throwable e) {
179
System.err.println("Unhandled error: " + message);
180
e.printStackTrace();
181
}
182
});
183
184
// Use background callback for specific operations
185
client.create()
186
.creatingParentsIfNeeded()
187
.inBackground(new BackgroundCallback() {
188
@Override
189
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
190
if (event.getResultCode() == 0) {
191
System.out.println("Successfully created: " + event.getPath());
192
} else {
193
System.err.println("Failed to create: " + event.getResultCode());
194
}
195
}
196
})
197
.forPath("/background/path", "data".getBytes());
198
199
// Lambda-style callback
200
client.getData()
201
.inBackground((curatorFramework, curatorEvent) -> {
202
if (curatorEvent.getResultCode() == 0) {
203
byte[] data = curatorEvent.getData();
204
System.out.println("Data retrieved: " + new String(data));
205
}
206
})
207
.forPath("/some/path");
208
```
209
210
### Connection State Management
211
212
Connection state tracking and error handling for robust distributed application development.
213
214
```java { .api }
215
/**
216
* Returns the listenable interface for the Connect State
217
* @return listenable
218
*/
219
Listenable<ConnectionStateListener> getConnectionStateListenable();
220
221
/**
222
* Return the configured error policy
223
* @return error policy
224
*/
225
ConnectionStateErrorPolicy getConnectionStateErrorPolicy();
226
227
/**
228
* Represents connection states to ZooKeeper
229
*/
230
public enum ConnectionState {
231
/**
232
* Sent for the first successful connection to the server
233
*/
234
CONNECTED,
235
236
/**
237
* There has been a loss of connection. Leaders, locks, etc. should suspend
238
* until the connection is re-established
239
*/
240
SUSPENDED,
241
242
/**
243
* A suspended or lost connection has been re-established
244
*/
245
RECONNECTED,
246
247
/**
248
* The connection is confirmed to be lost. Close any locks, leaders, etc. and
249
* attempt to re-create them
250
*/
251
LOST,
252
253
/**
254
* The connection has gone into read-only mode. This can only happen if you pass true
255
* for canBeReadOnly() in the builder
256
*/
257
READ_ONLY;
258
259
/**
260
* Check if this state represents a connection
261
* @return true if connected
262
*/
263
public boolean isConnected() {
264
return (this == CONNECTED) || (this == RECONNECTED) || (this == READ_ONLY);
265
}
266
}
267
268
/**
269
* Listener for connection state changes
270
*/
271
public interface ConnectionStateListener {
272
/**
273
* Called when the connection state changes
274
* @param client the client
275
* @param newState the new state
276
*/
277
void stateChanged(CuratorFramework client, ConnectionState newState);
278
}
279
280
/**
281
* Policy for handling connection errors
282
*/
283
public interface ConnectionStateErrorPolicy {
284
/**
285
* Return true if this the given state represents an error
286
* @param state the state
287
* @return true/false
288
*/
289
boolean isErrorState(ConnectionState state);
290
291
/**
292
* Return the timeout to use when checking error states
293
* @return timeout in milliseconds
294
*/
295
int getErrorThresholdMs();
296
}
297
298
/**
299
* Default error policy implementation
300
*/
301
public class StandardConnectionStateErrorPolicy implements ConnectionStateErrorPolicy {
302
// Default implementation considers SUSPENDED and LOST as error states
303
}
304
305
/**
306
* Session-based error policy implementation
307
*/
308
public class SessionConnectionStateErrorPolicy implements ConnectionStateErrorPolicy {
309
// Session-based error handling with different thresholds
310
}
311
```
312
313
**Usage Examples:**
314
315
```java
316
// Add connection state listener
317
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
318
@Override
319
public void stateChanged(CuratorFramework client, ConnectionState newState) {
320
System.out.println("Connection state changed to: " + newState);
321
322
switch (newState) {
323
case CONNECTED:
324
System.out.println("Connected to ZooKeeper");
325
// Initialize application state
326
break;
327
328
case SUSPENDED:
329
System.out.println("Connection suspended - pausing operations");
330
// Pause non-critical operations
331
break;
332
333
case RECONNECTED:
334
System.out.println("Reconnected to ZooKeeper");
335
// Resume operations, refresh state if needed
336
break;
337
338
case LOST:
339
System.out.println("Connection lost - cleaning up");
340
// Clean up ephemeral nodes, release locks, etc.
341
break;
342
343
case READ_ONLY:
344
System.out.println("In read-only mode");
345
// Only perform read operations
346
break;
347
}
348
}
349
});
350
351
// Use custom executor for connection state callbacks
352
Executor customExecutor = Executors.newSingleThreadExecutor(r -> {
353
Thread t = new Thread(r, "ConnectionStateHandler");
354
t.setDaemon(true);
355
return t;
356
});
357
358
client.getConnectionStateListenable().addListener(connectionStateListener, customExecutor);
359
360
// Check current connection state
361
ConnectionState currentState = client.getZookeeperClient().getZooKeeper().getState();
362
System.out.println("Current ZK state: " + currentState);
363
364
// Wait for connection
365
if (!client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
366
throw new RuntimeException("Failed to connect to ZooKeeper");
367
}
368
```
369
370
### Advanced Connection State Management
371
372
Advanced connection state management with circuit breaking and custom listener factories.
373
374
```java { .api }
375
/**
376
* Factory for creating connection state listener managers
377
*/
378
public class ConnectionStateListenerManagerFactory {
379
/**
380
* Standard factory instance
381
*/
382
public static final ConnectionStateListenerManagerFactory standard;
383
384
/**
385
* Create a circuit breaking factory
386
* @param retryPolicy retry policy for circuit breaking
387
* @return circuit breaking factory
388
*/
389
public static ConnectionStateListenerManagerFactory circuitBreaking(RetryPolicy retryPolicy);
390
}
391
392
/**
393
* Circuit breaker for connection management
394
*/
395
public interface CircuitBreaker {
396
/**
397
* Check if the circuit is open (failing)
398
* @return true if circuit is open
399
*/
400
boolean isOpen();
401
402
/**
403
* Get the current retry count
404
* @return retry count
405
*/
406
int getRetryCount();
407
}
408
409
/**
410
* Connection state listener with circuit breaking capability
411
*/
412
public class CircuitBreakingConnectionStateListener implements ConnectionStateListener {
413
/**
414
* Get the circuit breaker instance
415
* @return circuit breaker
416
*/
417
public CircuitBreaker getCircuitBreaker();
418
}
419
420
/**
421
* Manager for circuit breaking functionality
422
*/
423
public class CircuitBreakingManager {
424
/**
425
* Check if circuit breaking is active
426
* @return true if active
427
*/
428
public boolean isActive();
429
430
/**
431
* Get the current circuit breaker
432
* @return circuit breaker or null
433
*/
434
public CircuitBreaker getCircuitBreaker();
435
}
436
```
437
438
**Usage Examples:**
439
440
```java
441
// Configure client with circuit breaking connection state management
442
CuratorFramework clientWithCircuitBreaker = CuratorFrameworkFactory.builder()
443
.connectString("localhost:2181")
444
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
445
.connectionStateListenerManagerFactory(
446
ConnectionStateListenerManagerFactory.circuitBreaking(
447
new ExponentialBackoffRetry(1000, 5)
448
)
449
)
450
.build();
451
452
// Custom error policy
453
ConnectionStateErrorPolicy customPolicy = new ConnectionStateErrorPolicy() {
454
@Override
455
public boolean isErrorState(ConnectionState state) {
456
return state == ConnectionState.LOST || state == ConnectionState.SUSPENDED;
457
}
458
459
@Override
460
public int getErrorThresholdMs() {
461
return 30000; // 30 seconds
462
}
463
};
464
465
CuratorFramework clientWithCustomPolicy = CuratorFrameworkFactory.builder()
466
.connectString("localhost:2181")
467
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
468
.connectionStateErrorPolicy(customPolicy)
469
.build();
470
```
471
472
### Listener Management
473
474
Generic interface for managing event listeners with custom executors.
475
476
```java { .api }
477
/**
478
* Generic interface for managing listeners
479
*/
480
public interface Listenable<T> {
481
/**
482
* Add a listener with the default executor
483
* @param listener listener to add
484
*/
485
void addListener(T listener);
486
487
/**
488
* Add a listener with custom executor
489
* @param listener listener to add
490
* @param executor executor for listener callbacks
491
*/
492
void addListener(T listener, Executor executor);
493
494
/**
495
* Remove a listener
496
* @param listener listener to remove
497
*/
498
void removeListener(T listener);
499
}
500
501
/**
502
* Container for listener and its executor
503
*/
504
public class ListenerEntry<T> {
505
/**
506
* Get the listener
507
* @return listener
508
*/
509
public T getListener();
510
511
/**
512
* Get the executor
513
* @return executor
514
*/
515
public Executor getExecutor();
516
}
517
```
518
519
**Usage Examples:**
520
521
```java
522
// Add listeners with custom executors
523
ExecutorService backgroundExecutor = Executors.newCachedThreadPool();
524
ExecutorService priorityExecutor = Executors.newSingleThreadExecutor();
525
526
// High priority connection state listener
527
client.getConnectionStateListenable().addListener(criticalConnectionListener, priorityExecutor);
528
529
// Background event processing
530
client.getCuratorListenable().addListener(backgroundEventListener, backgroundExecutor);
531
532
// Remove listeners when done
533
client.getConnectionStateListenable().removeListener(criticalConnectionListener);
534
client.getCuratorListenable().removeListener(backgroundEventListener);
535
536
// Clean up executors
537
backgroundExecutor.shutdown();
538
priorityExecutor.shutdown();
539
```