CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-curator--curator-client

Low-level API for Apache ZooKeeper client library providing connection management, retry policies, and basic ZooKeeper operations

Pending
Overview
Eval results
Files

connection-handling.mddocs/

Connection Handling

Advanced connection handling policies and thread-local retry management for fine-grained control over ZooKeeper connection behavior in Apache Curator. These components provide sophisticated connection management beyond basic retry policies.

Capabilities

ConnectionHandlingPolicy Interface

Interface for implementing custom connection timeout and handling behavior policies.

/**
 * Interface for connection handling policies
 */
public interface ConnectionHandlingPolicy {
    /**
     * Check connection timeouts and return status information
     * @return CheckTimeoutsResult with timeout status details
     * @throws Exception if timeout check fails
     */
    CheckTimeoutsResult checkTimeouts() throws Exception;
    
    /**
     * Execute callable with retry logic specific to this connection policy
     * @param client CuratorZookeeperClient instance
     * @param proc Callable to execute with connection handling
     * @return Result of successful callable execution
     * @throws Exception if operation fails after policy handling
     */
    <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception;
}

StandardConnectionHandlingPolicy Class

Default implementation of ConnectionHandlingPolicy providing standard connection management behavior.

/**
 * Standard implementation of connection handling policy
 */
public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy {
    /**
     * Create standard connection handling policy with default settings
     */
    public StandardConnectionHandlingPolicy();
    
    /**
     * Create standard connection handling policy with custom timeout
     * @param timeoutMs Custom timeout in milliseconds
     */
    public StandardConnectionHandlingPolicy(int timeoutMs);
}

Usage Examples:

import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.connection.ConnectionHandlingPolicy;

// Default standard policy
ConnectionHandlingPolicy defaultPolicy = new StandardConnectionHandlingPolicy();

// Custom timeout policy  
ConnectionHandlingPolicy customPolicy = new StandardConnectionHandlingPolicy(10000); // 10 second timeout

// Use with operations
try {
    String result = customPolicy.callWithRetry(client, () -> {
        return new String(client.getZooKeeper().getData("/config/setting", false, null));
    });
} catch (Exception e) {
    // Handle connection policy failure
    System.err.println("Connection policy failed: " + e.getMessage());
}

ThreadLocalRetryLoop Class

Thread-local retry loop management for handling retry state per thread in multi-threaded applications.

/**
 * Thread-local retry loop management
 */
public class ThreadLocalRetryLoop {
    /**
     * Get or create retry loop for current thread
     * @param client CuratorZookeeperClient instance
     * @return RetryLoop instance for current thread
     */
    public static RetryLoop getRetryLoop(CuratorZookeeperClient client);
    
    /**
     * Clear retry loop for current thread
     * Use this to clean up thread-local state when thread processing is complete
     */
    public static void clearRetryLoop();
    
    /**
     * Check if current thread has an active retry loop
     * @return true if current thread has retry loop, false otherwise
     */
    public static boolean hasRetryLoop(); 
}

Usage Examples:

import org.apache.curator.connection.ThreadLocalRetryLoop;

// Multi-threaded retry handling
ExecutorService executor = Executors.newFixedThreadPool(10);

executor.submit(() -> {
    try {
        // Get thread-local retry loop
        RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
        
        while (retryLoop.shouldContinue()) {
            try {
                // ZooKeeper operations specific to this thread
                client.getZooKeeper().create("/thread/" + Thread.currentThread().getId(), 
                    "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                retryLoop.markComplete();
            } catch (Exception e) {
                retryLoop.takeException(e);
            }
        }
    } finally {
        // Clean up thread-local state
        ThreadLocalRetryLoop.clearRetryLoop();
    }
});

CheckTimeoutsResult Class

Result object returned by connection handling policy timeout checks.

/**
 * Result of connection timeout checks
 */
public class CheckTimeoutsResult {
    /**
     * Check if connection has timed out
     * @return true if connection is timed out
     */
    public boolean isTimedOut();
    
    /**
     * Get timeout duration in milliseconds
     * @return Timeout duration, or -1 if not timed out
     */
    public long getTimeoutMs();
    
    /**
     * Get descriptive message about timeout status
     * @return Human-readable timeout status message
     */
    public String getMessage();
}

Advanced Connection Handling Patterns

Custom Connection Policy Implementation

import org.apache.curator.connection.ConnectionHandlingPolicy;
import java.util.concurrent.Callable;

/**
 * Custom connection policy with adaptive timeout based on operation history
 */
public class AdaptiveConnectionHandlingPolicy implements ConnectionHandlingPolicy {
    private final AtomicLong avgOperationTime = new AtomicLong(1000);
    private final int baseTimeoutMs;
    
    public AdaptiveConnectionHandlingPolicy(int baseTimeoutMs) {
        this.baseTimeoutMs = baseTimeoutMs;
    }
    
    @Override
    public CheckTimeoutsResult checkTimeouts() throws Exception {
        // Calculate adaptive timeout based on recent operation performance
        long adaptiveTimeout = Math.max(baseTimeoutMs, avgOperationTime.get() * 3);
        
        // Custom timeout logic
        boolean timedOut = /* your timeout detection logic */;
        
        return new CheckTimeoutsResult() {
            @Override
            public boolean isTimedOut() { return timedOut; }
            
            @Override  
            public long getTimeoutMs() { return adaptiveTimeout; }
            
            @Override
            public String getMessage() { 
                return "Adaptive timeout: " + adaptiveTimeout + "ms"; 
            }
        };
    }
    
    @Override
    public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception {
        long startTime = System.currentTimeMillis();
        
        try {
            T result = proc.call();
            
            // Update average operation time for adaptive behavior
            long operationTime = System.currentTimeMillis() - startTime;
            avgOperationTime.set((avgOperationTime.get() + operationTime) / 2);
            
            return result;
        } catch (Exception e) {
            // Custom retry logic based on exception type and timeout status
            CheckTimeoutsResult timeoutResult = checkTimeouts();
            if (timeoutResult.isTimedOut()) {
                throw new RuntimeException("Operation timed out: " + timeoutResult.getMessage(), e);
            }
            throw e;
        }
    }
}

Circuit Breaker Connection Policy

/**
 * Connection policy with circuit breaker pattern for failing connections
 */
public class CircuitBreakerConnectionPolicy implements ConnectionHandlingPolicy {
    private enum State { CLOSED, OPEN, HALF_OPEN }
    
    private volatile State state = State.CLOSED;
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicLong lastFailureTime = new AtomicLong(0);
    private final int failureThreshold;
    private final long recoveryTimeoutMs;
    
    public CircuitBreakerConnectionPolicy(int failureThreshold, long recoveryTimeoutMs) {
        this.failureThreshold = failureThreshold;
        this.recoveryTimeoutMs = recoveryTimeoutMs;
    }
    
    @Override
    public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception {
        if (state == State.OPEN) {
            if (System.currentTimeMillis() - lastFailureTime.get() > recoveryTimeoutMs) {
                state = State.HALF_OPEN;
            } else {
                throw new RuntimeException("Circuit breaker is OPEN - failing fast");
            }
        }
        
        try {
            T result = proc.call();
            
            // Success - reset circuit breaker
            if (state == State.HALF_OPEN) {
                state = State.CLOSED;
                failureCount.set(0);
            }
            
            return result;
        } catch (Exception e) {
            // Failure - update circuit breaker state
            int failures = failureCount.incrementAndGet();
            lastFailureTime.set(System.currentTimeMillis());
            
            if (failures >= failureThreshold) {
                state = State.OPEN;
            }
            
            throw e;
        }
    }
    
    @Override
    public CheckTimeoutsResult checkTimeouts() throws Exception {
        return new CheckTimeoutsResult() {
            @Override
            public boolean isTimedOut() { return state == State.OPEN; }
            
            @Override
            public long getTimeoutMs() { return recoveryTimeoutMs; }
            
            @Override  
            public String getMessage() { 
                return "Circuit breaker state: " + state + ", failures: " + failureCount.get(); 
            }
        };
    }
}

Multi-threaded Connection Management

/**
 * Connection manager for multi-threaded applications with thread-local retry handling
 */
public class MultiThreadedConnectionManager {
    private final CuratorZookeeperClient client;
    private final ConnectionHandlingPolicy policy;
    private final ExecutorService executorService;
    
    public MultiThreadedConnectionManager(CuratorZookeeperClient client, 
                                        ConnectionHandlingPolicy policy,
                                        int threadPoolSize) {
        this.client = client;
        this.policy = policy;
        this.executorService = Executors.newFixedThreadPool(threadPoolSize);
    }
    
    public <T> CompletableFuture<T> executeAsync(Callable<T> operation) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Use thread-local retry loop
                if (!ThreadLocalRetryLoop.hasRetryLoop()) {
                    RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
                }
                
                // Execute with connection policy
                return policy.callWithRetry(client, operation);
                
            } catch (Exception e) {
                throw new RuntimeException("Operation failed", e);
            } finally {
                // Clean up thread-local state
                ThreadLocalRetryLoop.clearRetryLoop();
            }
        }, executorService);
    }
    
    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

// Usage example
MultiThreadedConnectionManager connectionManager = new MultiThreadedConnectionManager(
    client, new StandardConnectionHandlingPolicy(), 10);

// Execute operations across threads
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
    final int index = i;
    CompletableFuture<String> future = connectionManager.executeAsync(() -> {
        return new String(client.getZooKeeper().getData("/data/" + index, false, null));
    });
    futures.add(future);
}

// Wait for all operations to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
    .thenRun(() -> System.out.println("All operations completed"))
    .join();

Connection Health Monitoring

/**
 * Connection health monitor using connection handling policies
 */
public class ConnectionHealthMonitor {
    private final ConnectionHandlingPolicy policy;
    private final ScheduledExecutorService scheduler;
    private final AtomicBoolean isHealthy = new AtomicBoolean(true);
    
    public ConnectionHealthMonitor(ConnectionHandlingPolicy policy) {
        this.policy = policy;
        this.scheduler = Executors.newScheduledThreadPool(1);
        
        // Schedule periodic health checks
        scheduler.scheduleAtFixedRate(this::checkHealth, 0, 30, TimeUnit.SECONDS);
    }
    
    private void checkHealth() {
        try {
            CheckTimeoutsResult result = policy.checkTimeouts();
            boolean healthy = !result.isTimedOut();
            
            if (healthy != isHealthy.get()) {
                isHealthy.set(healthy);
                if (healthy) {
                    System.out.println("Connection health restored");
                } else {
                    System.err.println("Connection health degraded: " + result.getMessage());
                }
            }
        } catch (Exception e) {
            isHealthy.set(false);
            System.err.println("Health check failed: " + e.getMessage());
        }
    }
    
    public boolean isHealthy() {
        return isHealthy.get();
    }
    
    public void shutdown() {
        scheduler.shutdown();
    }
}

Connection Handling Best Practices

Policy Selection Guidelines

StandardConnectionHandlingPolicy:

  • Use for most production applications
  • Provides reliable timeout handling and retry behavior
  • Good balance of performance and reliability

Custom Policies:

  • Implement for specialized requirements (circuit breakers, adaptive timeouts)
  • Consider maintenance overhead and complexity
  • Test thoroughly under failure conditions

Thread Safety Considerations

// Always clean up thread-local state in multi-threaded applications
try {
    RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);
    // ... use retry loop ...
} finally {
    ThreadLocalRetryLoop.clearRetryLoop(); // Prevent memory leaks
}

// Use connection policies that are thread-safe
ConnectionHandlingPolicy threadSafePolicy = new StandardConnectionHandlingPolicy();
// Multiple threads can safely use the same policy instance

Error Handling Patterns

// Comprehensive error handling with connection policies
try {
    T result = connectionPolicy.callWithRetry(client, () -> {
        // ZooKeeper operation
        return client.getZooKeeper().getData(path, false, null);
    });
    return result;
} catch (Exception e) {
    // Check if failure was due to connection handling policy
    try {
        CheckTimeoutsResult timeoutResult = connectionPolicy.checkTimeouts();
        if (timeoutResult.isTimedOut()) {
            // Handle timeout-specific failure
            handleTimeoutFailure(timeoutResult);
        } else {
            // Handle other types of failures
            handleGeneralFailure(e);
        }
    } catch (Exception timeoutCheckException) {
        // Handle failure to check timeouts
        handleCriticalFailure(timeoutCheckException);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-curator--curator-client

docs

client-connection.md

connection-handling.md

ensemble-providers.md

index.md

path-utilities.md

retry-policies.md

tracing-metrics.md

tile.json