0
# Connection Handling
1
2
Advanced connection handling policies and thread-local retry management for fine-grained control over ZooKeeper connection behavior in Apache Curator. These components provide sophisticated connection management beyond basic retry policies.
3
4
## Capabilities
5
6
### ConnectionHandlingPolicy Interface
7
8
Interface for implementing custom connection timeout and handling behavior policies.
9
10
```java { .api }
11
/**
12
* Interface for connection handling policies
13
*/
14
public interface ConnectionHandlingPolicy {
15
/**
16
* Check connection timeouts and return status information
17
* @return CheckTimeoutsResult with timeout status details
18
* @throws Exception if timeout check fails
19
*/
20
CheckTimeoutsResult checkTimeouts() throws Exception;
21
22
/**
23
* Execute callable with retry logic specific to this connection policy
24
* @param client CuratorZookeeperClient instance
25
* @param proc Callable to execute with connection handling
26
* @return Result of successful callable execution
27
* @throws Exception if operation fails after policy handling
28
*/
29
<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception;
30
}
31
```
32
33
### StandardConnectionHandlingPolicy Class
34
35
Default implementation of ConnectionHandlingPolicy providing standard connection management behavior.
36
37
```java { .api }
38
/**
39
* Standard implementation of connection handling policy
40
*/
41
public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy {
42
/**
43
* Create standard connection handling policy with default settings
44
*/
45
public StandardConnectionHandlingPolicy();
46
47
/**
48
* Create standard connection handling policy with custom timeout
49
* @param timeoutMs Custom timeout in milliseconds
50
*/
51
public StandardConnectionHandlingPolicy(int timeoutMs);
52
}
53
```
54
55
**Usage Examples:**
56
57
```java
58
import org.apache.curator.connection.StandardConnectionHandlingPolicy;
59
import org.apache.curator.connection.ConnectionHandlingPolicy;
60
61
// Default standard policy
62
ConnectionHandlingPolicy defaultPolicy = new StandardConnectionHandlingPolicy();
63
64
// Custom timeout policy
65
ConnectionHandlingPolicy customPolicy = new StandardConnectionHandlingPolicy(10000); // 10 second timeout
66
67
// Use with operations
68
try {
69
String result = customPolicy.callWithRetry(client, () -> {
70
return new String(client.getZooKeeper().getData("/config/setting", false, null));
71
});
72
} catch (Exception e) {
73
// Handle connection policy failure
74
System.err.println("Connection policy failed: " + e.getMessage());
75
}
76
```
77
78
### ThreadLocalRetryLoop Class
79
80
Thread-local retry loop management for handling retry state per thread in multi-threaded applications.
81
82
```java { .api }
83
/**
84
* Thread-local retry loop management
85
*/
86
public class ThreadLocalRetryLoop {
87
/**
88
* Get or create retry loop for current thread
89
* @param client CuratorZookeeperClient instance
90
* @return RetryLoop instance for current thread
91
*/
92
public static RetryLoop getRetryLoop(CuratorZookeeperClient client);
93
94
/**
95
* Clear retry loop for current thread
96
* Use this to clean up thread-local state when thread processing is complete
97
*/
98
public static void clearRetryLoop();
99
100
/**
101
* Check if current thread has an active retry loop
102
* @return true if current thread has retry loop, false otherwise
103
*/
104
public static boolean hasRetryLoop();
105
}
106
```
107
108
**Usage Examples:**
109
110
```java
111
import org.apache.curator.connection.ThreadLocalRetryLoop;
112
113
// Multi-threaded retry handling
114
ExecutorService executor = Executors.newFixedThreadPool(10);
115
116
executor.submit(() -> {
117
try {
118
// Get thread-local retry loop
119
RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
120
121
while (retryLoop.shouldContinue()) {
122
try {
123
// ZooKeeper operations specific to this thread
124
client.getZooKeeper().create("/thread/" + Thread.currentThread().getId(),
125
"data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
126
retryLoop.markComplete();
127
} catch (Exception e) {
128
retryLoop.takeException(e);
129
}
130
}
131
} finally {
132
// Clean up thread-local state
133
ThreadLocalRetryLoop.clearRetryLoop();
134
}
135
});
136
```
137
138
### CheckTimeoutsResult Class
139
140
Result object returned by connection handling policy timeout checks.
141
142
```java { .api }
143
/**
144
* Result of connection timeout checks
145
*/
146
public class CheckTimeoutsResult {
147
/**
148
* Check if connection has timed out
149
* @return true if connection is timed out
150
*/
151
public boolean isTimedOut();
152
153
/**
154
* Get timeout duration in milliseconds
155
* @return Timeout duration, or -1 if not timed out
156
*/
157
public long getTimeoutMs();
158
159
/**
160
* Get descriptive message about timeout status
161
* @return Human-readable timeout status message
162
*/
163
public String getMessage();
164
}
165
```
166
167
## Advanced Connection Handling Patterns
168
169
### Custom Connection Policy Implementation
170
171
```java
172
import org.apache.curator.connection.ConnectionHandlingPolicy;
173
import java.util.concurrent.Callable;
174
175
/**
176
* Custom connection policy with adaptive timeout based on operation history
177
*/
178
public class AdaptiveConnectionHandlingPolicy implements ConnectionHandlingPolicy {
179
private final AtomicLong avgOperationTime = new AtomicLong(1000);
180
private final int baseTimeoutMs;
181
182
public AdaptiveConnectionHandlingPolicy(int baseTimeoutMs) {
183
this.baseTimeoutMs = baseTimeoutMs;
184
}
185
186
@Override
187
public CheckTimeoutsResult checkTimeouts() throws Exception {
188
// Calculate adaptive timeout based on recent operation performance
189
long adaptiveTimeout = Math.max(baseTimeoutMs, avgOperationTime.get() * 3);
190
191
// Custom timeout logic
192
boolean timedOut = /* your timeout detection logic */;
193
194
return new CheckTimeoutsResult() {
195
@Override
196
public boolean isTimedOut() { return timedOut; }
197
198
@Override
199
public long getTimeoutMs() { return adaptiveTimeout; }
200
201
@Override
202
public String getMessage() {
203
return "Adaptive timeout: " + adaptiveTimeout + "ms";
204
}
205
};
206
}
207
208
@Override
209
public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception {
210
long startTime = System.currentTimeMillis();
211
212
try {
213
T result = proc.call();
214
215
// Update average operation time for adaptive behavior
216
long operationTime = System.currentTimeMillis() - startTime;
217
avgOperationTime.set((avgOperationTime.get() + operationTime) / 2);
218
219
return result;
220
} catch (Exception e) {
221
// Custom retry logic based on exception type and timeout status
222
CheckTimeoutsResult timeoutResult = checkTimeouts();
223
if (timeoutResult.isTimedOut()) {
224
throw new RuntimeException("Operation timed out: " + timeoutResult.getMessage(), e);
225
}
226
throw e;
227
}
228
}
229
}
230
```
231
232
### Circuit Breaker Connection Policy
233
234
```java
235
/**
236
* Connection policy with circuit breaker pattern for failing connections
237
*/
238
public class CircuitBreakerConnectionPolicy implements ConnectionHandlingPolicy {
239
private enum State { CLOSED, OPEN, HALF_OPEN }
240
241
private volatile State state = State.CLOSED;
242
private final AtomicInteger failureCount = new AtomicInteger(0);
243
private final AtomicLong lastFailureTime = new AtomicLong(0);
244
private final int failureThreshold;
245
private final long recoveryTimeoutMs;
246
247
public CircuitBreakerConnectionPolicy(int failureThreshold, long recoveryTimeoutMs) {
248
this.failureThreshold = failureThreshold;
249
this.recoveryTimeoutMs = recoveryTimeoutMs;
250
}
251
252
@Override
253
public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception {
254
if (state == State.OPEN) {
255
if (System.currentTimeMillis() - lastFailureTime.get() > recoveryTimeoutMs) {
256
state = State.HALF_OPEN;
257
} else {
258
throw new RuntimeException("Circuit breaker is OPEN - failing fast");
259
}
260
}
261
262
try {
263
T result = proc.call();
264
265
// Success - reset circuit breaker
266
if (state == State.HALF_OPEN) {
267
state = State.CLOSED;
268
failureCount.set(0);
269
}
270
271
return result;
272
} catch (Exception e) {
273
// Failure - update circuit breaker state
274
int failures = failureCount.incrementAndGet();
275
lastFailureTime.set(System.currentTimeMillis());
276
277
if (failures >= failureThreshold) {
278
state = State.OPEN;
279
}
280
281
throw e;
282
}
283
}
284
285
@Override
286
public CheckTimeoutsResult checkTimeouts() throws Exception {
287
return new CheckTimeoutsResult() {
288
@Override
289
public boolean isTimedOut() { return state == State.OPEN; }
290
291
@Override
292
public long getTimeoutMs() { return recoveryTimeoutMs; }
293
294
@Override
295
public String getMessage() {
296
return "Circuit breaker state: " + state + ", failures: " + failureCount.get();
297
}
298
};
299
}
300
}
301
```
302
303
### Multi-threaded Connection Management
304
305
```java
306
/**
307
* Connection manager for multi-threaded applications with thread-local retry handling
308
*/
309
public class MultiThreadedConnectionManager {
310
private final CuratorZookeeperClient client;
311
private final ConnectionHandlingPolicy policy;
312
private final ExecutorService executorService;
313
314
public MultiThreadedConnectionManager(CuratorZookeeperClient client,
315
ConnectionHandlingPolicy policy,
316
int threadPoolSize) {
317
this.client = client;
318
this.policy = policy;
319
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
320
}
321
322
public <T> CompletableFuture<T> executeAsync(Callable<T> operation) {
323
return CompletableFuture.supplyAsync(() -> {
324
try {
325
// Use thread-local retry loop
326
if (!ThreadLocalRetryLoop.hasRetryLoop()) {
327
RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
328
}
329
330
// Execute with connection policy
331
return policy.callWithRetry(client, operation);
332
333
} catch (Exception e) {
334
throw new RuntimeException("Operation failed", e);
335
} finally {
336
// Clean up thread-local state
337
ThreadLocalRetryLoop.clearRetryLoop();
338
}
339
}, executorService);
340
}
341
342
public void shutdown() {
343
executorService.shutdown();
344
try {
345
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
346
executorService.shutdownNow();
347
}
348
} catch (InterruptedException e) {
349
executorService.shutdownNow();
350
Thread.currentThread().interrupt();
351
}
352
}
353
}
354
355
// Usage example
356
MultiThreadedConnectionManager connectionManager = new MultiThreadedConnectionManager(
357
client, new StandardConnectionHandlingPolicy(), 10);
358
359
// Execute operations across threads
360
List<CompletableFuture<String>> futures = new ArrayList<>();
361
for (int i = 0; i < 100; i++) {
362
final int index = i;
363
CompletableFuture<String> future = connectionManager.executeAsync(() -> {
364
return new String(client.getZooKeeper().getData("/data/" + index, false, null));
365
});
366
futures.add(future);
367
}
368
369
// Wait for all operations to complete
370
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
371
.thenRun(() -> System.out.println("All operations completed"))
372
.join();
373
```
374
375
### Connection Health Monitoring
376
377
```java
378
/**
379
* Connection health monitor using connection handling policies
380
*/
381
public class ConnectionHealthMonitor {
382
private final ConnectionHandlingPolicy policy;
383
private final ScheduledExecutorService scheduler;
384
private final AtomicBoolean isHealthy = new AtomicBoolean(true);
385
386
public ConnectionHealthMonitor(ConnectionHandlingPolicy policy) {
387
this.policy = policy;
388
this.scheduler = Executors.newScheduledThreadPool(1);
389
390
// Schedule periodic health checks
391
scheduler.scheduleAtFixedRate(this::checkHealth, 0, 30, TimeUnit.SECONDS);
392
}
393
394
private void checkHealth() {
395
try {
396
CheckTimeoutsResult result = policy.checkTimeouts();
397
boolean healthy = !result.isTimedOut();
398
399
if (healthy != isHealthy.get()) {
400
isHealthy.set(healthy);
401
if (healthy) {
402
System.out.println("Connection health restored");
403
} else {
404
System.err.println("Connection health degraded: " + result.getMessage());
405
}
406
}
407
} catch (Exception e) {
408
isHealthy.set(false);
409
System.err.println("Health check failed: " + e.getMessage());
410
}
411
}
412
413
public boolean isHealthy() {
414
return isHealthy.get();
415
}
416
417
public void shutdown() {
418
scheduler.shutdown();
419
}
420
}
421
```
422
423
## Connection Handling Best Practices
424
425
### Policy Selection Guidelines
426
427
**StandardConnectionHandlingPolicy**:
428
- Use for most production applications
429
- Provides reliable timeout handling and retry behavior
430
- Good balance of performance and reliability
431
432
**Custom Policies**:
433
- Implement for specialized requirements (circuit breakers, adaptive timeouts)
434
- Consider maintenance overhead and complexity
435
- Test thoroughly under failure conditions
436
437
### Thread Safety Considerations
438
439
```java
440
// Always clean up thread-local state in multi-threaded applications
441
try {
442
RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
443
// ... use retry loop ...
444
} finally {
445
ThreadLocalRetryLoop.clearRetryLoop(); // Prevent memory leaks
446
}
447
448
// Use connection policies that are thread-safe
449
ConnectionHandlingPolicy threadSafePolicy = new StandardConnectionHandlingPolicy();
450
// Multiple threads can safely use the same policy instance
451
```
452
453
### Error Handling Patterns
454
455
```java
456
// Comprehensive error handling with connection policies
457
try {
458
T result = connectionPolicy.callWithRetry(client, () -> {
459
// ZooKeeper operation
460
return client.getZooKeeper().getData(path, false, null);
461
});
462
return result;
463
} catch (Exception e) {
464
// Check if failure was due to connection handling policy
465
try {
466
CheckTimeoutsResult timeoutResult = connectionPolicy.checkTimeouts();
467
if (timeoutResult.isTimedOut()) {
468
// Handle timeout-specific failure
469
handleTimeoutFailure(timeoutResult);
470
} else {
471
// Handle other types of failures
472
handleGeneralFailure(e);
473
}
474
} catch (Exception timeoutCheckException) {
475
// Handle failure to check timeouts
476
handleCriticalFailure(timeoutCheckException);
477
}
478
}
479
```