Low-level API for Apache ZooKeeper client library providing connection management, retry policies, and basic ZooKeeper operations
—
Advanced connection handling policies and thread-local retry management for fine-grained control over ZooKeeper connection behavior in Apache Curator. These components provide sophisticated connection management beyond basic retry policies.
Interface for implementing custom connection timeout and handling behavior policies.
/**
* Interface for connection handling policies
*/
public interface ConnectionHandlingPolicy {
/**
* Check connection timeouts and return status information
* @return CheckTimeoutsResult with timeout status details
* @throws Exception if timeout check fails
*/
CheckTimeoutsResult checkTimeouts() throws Exception;
/**
* Execute callable with retry logic specific to this connection policy
* @param client CuratorZookeeperClient instance
* @param proc Callable to execute with connection handling
* @return Result of successful callable execution
* @throws Exception if operation fails after policy handling
*/
<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception;
}Default implementation of ConnectionHandlingPolicy providing standard connection management behavior.
/**
* Standard implementation of connection handling policy
*/
public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy {
/**
* Create standard connection handling policy with default settings
*/
public StandardConnectionHandlingPolicy();
/**
* Create standard connection handling policy with custom timeout
* @param timeoutMs Custom timeout in milliseconds
*/
public StandardConnectionHandlingPolicy(int timeoutMs);
}Usage Examples:
import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.connection.ConnectionHandlingPolicy;
// Default standard policy
ConnectionHandlingPolicy defaultPolicy = new StandardConnectionHandlingPolicy();
// Custom timeout policy
ConnectionHandlingPolicy customPolicy = new StandardConnectionHandlingPolicy(10000); // 10 second timeout
// Use with operations
try {
String result = customPolicy.callWithRetry(client, () -> {
return new String(client.getZooKeeper().getData("/config/setting", false, null));
});
} catch (Exception e) {
// Handle connection policy failure
System.err.println("Connection policy failed: " + e.getMessage());
}Thread-local retry loop management for handling retry state per thread in multi-threaded applications.
/**
* Thread-local retry loop management
*/
public class ThreadLocalRetryLoop {
/**
* Get or create retry loop for current thread
* @param client CuratorZookeeperClient instance
* @return RetryLoop instance for current thread
*/
public static RetryLoop getRetryLoop(CuratorZookeeperClient client);
/**
* Clear retry loop for current thread
* Use this to clean up thread-local state when thread processing is complete
*/
public static void clearRetryLoop();
/**
* Check if current thread has an active retry loop
* @return true if current thread has retry loop, false otherwise
*/
public static boolean hasRetryLoop();
}Usage Examples:
import org.apache.curator.connection.ThreadLocalRetryLoop;
// Multi-threaded retry handling
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> {
try {
// Get thread-local retry loop
RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
while (retryLoop.shouldContinue()) {
try {
// ZooKeeper operations specific to this thread
client.getZooKeeper().create("/thread/" + Thread.currentThread().getId(),
"data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
retryLoop.markComplete();
} catch (Exception e) {
retryLoop.takeException(e);
}
}
} finally {
// Clean up thread-local state
ThreadLocalRetryLoop.clearRetryLoop();
}
});Result object returned by connection handling policy timeout checks.
/**
* Result of connection timeout checks
*/
public class CheckTimeoutsResult {
/**
* Check if connection has timed out
* @return true if connection is timed out
*/
public boolean isTimedOut();
/**
* Get timeout duration in milliseconds
* @return Timeout duration, or -1 if not timed out
*/
public long getTimeoutMs();
/**
* Get descriptive message about timeout status
* @return Human-readable timeout status message
*/
public String getMessage();
}import org.apache.curator.connection.ConnectionHandlingPolicy;
import java.util.concurrent.Callable;
/**
* Custom connection policy with adaptive timeout based on operation history
*/
public class AdaptiveConnectionHandlingPolicy implements ConnectionHandlingPolicy {
private final AtomicLong avgOperationTime = new AtomicLong(1000);
private final int baseTimeoutMs;
public AdaptiveConnectionHandlingPolicy(int baseTimeoutMs) {
this.baseTimeoutMs = baseTimeoutMs;
}
@Override
public CheckTimeoutsResult checkTimeouts() throws Exception {
// Calculate adaptive timeout based on recent operation performance
long adaptiveTimeout = Math.max(baseTimeoutMs, avgOperationTime.get() * 3);
// Custom timeout logic
boolean timedOut = /* your timeout detection logic */;
return new CheckTimeoutsResult() {
@Override
public boolean isTimedOut() { return timedOut; }
@Override
public long getTimeoutMs() { return adaptiveTimeout; }
@Override
public String getMessage() {
return "Adaptive timeout: " + adaptiveTimeout + "ms";
}
};
}
@Override
public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception {
long startTime = System.currentTimeMillis();
try {
T result = proc.call();
// Update average operation time for adaptive behavior
long operationTime = System.currentTimeMillis() - startTime;
avgOperationTime.set((avgOperationTime.get() + operationTime) / 2);
return result;
} catch (Exception e) {
// Custom retry logic based on exception type and timeout status
CheckTimeoutsResult timeoutResult = checkTimeouts();
if (timeoutResult.isTimedOut()) {
throw new RuntimeException("Operation timed out: " + timeoutResult.getMessage(), e);
}
throw e;
}
}
}/**
* Connection policy with circuit breaker pattern for failing connections
*/
public class CircuitBreakerConnectionPolicy implements ConnectionHandlingPolicy {
private enum State { CLOSED, OPEN, HALF_OPEN }
private volatile State state = State.CLOSED;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicLong lastFailureTime = new AtomicLong(0);
private final int failureThreshold;
private final long recoveryTimeoutMs;
public CircuitBreakerConnectionPolicy(int failureThreshold, long recoveryTimeoutMs) {
this.failureThreshold = failureThreshold;
this.recoveryTimeoutMs = recoveryTimeoutMs;
}
@Override
public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception {
if (state == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime.get() > recoveryTimeoutMs) {
state = State.HALF_OPEN;
} else {
throw new RuntimeException("Circuit breaker is OPEN - failing fast");
}
}
try {
T result = proc.call();
// Success - reset circuit breaker
if (state == State.HALF_OPEN) {
state = State.CLOSED;
failureCount.set(0);
}
return result;
} catch (Exception e) {
// Failure - update circuit breaker state
int failures = failureCount.incrementAndGet();
lastFailureTime.set(System.currentTimeMillis());
if (failures >= failureThreshold) {
state = State.OPEN;
}
throw e;
}
}
@Override
public CheckTimeoutsResult checkTimeouts() throws Exception {
return new CheckTimeoutsResult() {
@Override
public boolean isTimedOut() { return state == State.OPEN; }
@Override
public long getTimeoutMs() { return recoveryTimeoutMs; }
@Override
public String getMessage() {
return "Circuit breaker state: " + state + ", failures: " + failureCount.get();
}
};
}
}/**
* Connection manager for multi-threaded applications with thread-local retry handling
*/
public class MultiThreadedConnectionManager {
private final CuratorZookeeperClient client;
private final ConnectionHandlingPolicy policy;
private final ExecutorService executorService;
public MultiThreadedConnectionManager(CuratorZookeeperClient client,
ConnectionHandlingPolicy policy,
int threadPoolSize) {
this.client = client;
this.policy = policy;
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
}
public <T> CompletableFuture<T> executeAsync(Callable<T> operation) {
return CompletableFuture.supplyAsync(() -> {
try {
// Use thread-local retry loop
if (!ThreadLocalRetryLoop.hasRetryLoop()) {
RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
}
// Execute with connection policy
return policy.callWithRetry(client, operation);
} catch (Exception e) {
throw new RuntimeException("Operation failed", e);
} finally {
// Clean up thread-local state
ThreadLocalRetryLoop.clearRetryLoop();
}
}, executorService);
}
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// Usage example
MultiThreadedConnectionManager connectionManager = new MultiThreadedConnectionManager(
client, new StandardConnectionHandlingPolicy(), 10);
// Execute operations across threads
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
final int index = i;
CompletableFuture<String> future = connectionManager.executeAsync(() -> {
return new String(client.getZooKeeper().getData("/data/" + index, false, null));
});
futures.add(future);
}
// Wait for all operations to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> System.out.println("All operations completed"))
.join();/**
* Connection health monitor using connection handling policies
*/
public class ConnectionHealthMonitor {
private final ConnectionHandlingPolicy policy;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean isHealthy = new AtomicBoolean(true);
public ConnectionHealthMonitor(ConnectionHandlingPolicy policy) {
this.policy = policy;
this.scheduler = Executors.newScheduledThreadPool(1);
// Schedule periodic health checks
scheduler.scheduleAtFixedRate(this::checkHealth, 0, 30, TimeUnit.SECONDS);
}
private void checkHealth() {
try {
CheckTimeoutsResult result = policy.checkTimeouts();
boolean healthy = !result.isTimedOut();
if (healthy != isHealthy.get()) {
isHealthy.set(healthy);
if (healthy) {
System.out.println("Connection health restored");
} else {
System.err.println("Connection health degraded: " + result.getMessage());
}
}
} catch (Exception e) {
isHealthy.set(false);
System.err.println("Health check failed: " + e.getMessage());
}
}
public boolean isHealthy() {
return isHealthy.get();
}
public void shutdown() {
scheduler.shutdown();
}
}StandardConnectionHandlingPolicy:
Custom Policies:
// Always clean up thread-local state in multi-threaded applications
try {
RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
// ... use retry loop ...
} finally {
ThreadLocalRetryLoop.clearRetryLoop(); // Prevent memory leaks
}
// Use connection policies that are thread-safe
ConnectionHandlingPolicy threadSafePolicy = new StandardConnectionHandlingPolicy();
// Multiple threads can safely use the same policy instance// Comprehensive error handling with connection policies
try {
T result = connectionPolicy.callWithRetry(client, () -> {
// ZooKeeper operation
return client.getZooKeeper().getData(path, false, null);
});
return result;
} catch (Exception e) {
// Check if failure was due to connection handling policy
try {
CheckTimeoutsResult timeoutResult = connectionPolicy.checkTimeouts();
if (timeoutResult.isTimedOut()) {
// Handle timeout-specific failure
handleTimeoutFailure(timeoutResult);
} else {
// Handle other types of failures
handleGeneralFailure(e);
}
} catch (Exception timeoutCheckException) {
// Handle failure to check timeouts
handleCriticalFailure(timeoutCheckException);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-curator--curator-client