High-level API that greatly simplifies using ZooKeeper.
—
Comprehensive event handling and connection state management with listeners for background operations, connection state changes, and unhandled errors, providing robust monitoring and error handling capabilities.
Core event interfaces for handling ZooKeeper events and background operation results.
/**
* Returns the listenable interface for events
* @return listenable
*/
Listenable<CuratorListener> getCuratorListenable();
/**
* Returns the listenable interface for unhandled errors
* @return listenable
*/
Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
/**
* Represents ZooKeeper events and background operation results
*/
public interface CuratorEvent {
/**
* Get the event type
* @return event type
*/
CuratorEventType getType();
/**
* Get the ZooKeeper result code
* @return result code (0 = success)
*/
int getResultCode();
/**
* Get the path associated with this event
* @return path or null
*/
String getPath();
/**
* Get the background context object
* @return context or null
*/
Object getContext();
/**
* Get stat information
* @return stat or null
*/
Stat getStat();
/**
* Get data associated with the event
* @return data bytes or null
*/
byte[] getData();
/**
* Get the name (for create operations with sequential modes)
* @return name or null
*/
String getName();
/**
* Get children list (for getChildren operations)
* @return children list or null
*/
List<String> getChildren();
/**
* Get ACL list (for getACL/setACL operations)
* @return ACL list or null
*/
List<ACL> getACLList();
/**
* Get transaction results (for transaction operations)
* @return transaction results or null
*/
List<CuratorTransactionResult> getOpResults();
/**
* Get the watched event (for watcher callbacks)
* @return watched event or null
*/
WatchedEvent getWatchedEvent();
}
/**
* Types of curator events
*/
public enum CuratorEventType {
CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN,
SYNC, GET_ACL, SET_ACL, TRANSACTION, GET_CONFIG, RECONFIG,
WATCHED, REMOVE_WATCHES, ADD_WATCH, CLOSING
}
/**
* Listener for background events and errors
*/
public interface CuratorListener {
/**
* Called when a background event occurs
* @param client the client
* @param event the event
* @throws Exception errors
*/
void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
/**
* Listener for unhandled errors
*/
public interface UnhandledErrorListener {
/**
* Called when an unhandled error occurs
* @param message error message
* @param e the exception
*/
void unhandledError(String message, Throwable e);
}
/**
* Callback for background operations
*/
public interface BackgroundCallback {
/**
* Called when a background operation completes
* @param client the client
* @param event the event
* @throws Exception errors
*/
void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}Usage Examples:
// Add general curator listener
client.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("Event: " + event.getType() + " Path: " + event.getPath());
// Handle different event types
switch (event.getType()) {
case CREATE:
System.out.println("Node created: " + event.getPath());
break;
case DELETE:
System.out.println("Node deleted: " + event.getPath());
break;
case GET_DATA:
byte[] data = event.getData();
System.out.println("Got data: " + new String(data));
break;
case CHILDREN:
List<String> children = event.getChildren();
System.out.println("Children: " + children);
break;
case TRANSACTION:
List<CuratorTransactionResult> results = event.getOpResults();
System.out.println("Transaction completed with " + results.size() + " operations");
break;
}
}
});
// Add unhandled error listener
client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
@Override
public void unhandledError(String message, Throwable e) {
System.err.println("Unhandled error: " + message);
e.printStackTrace();
}
});
// Use background callback for specific operations
client.create()
.creatingParentsIfNeeded()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == 0) {
System.out.println("Successfully created: " + event.getPath());
} else {
System.err.println("Failed to create: " + event.getResultCode());
}
}
})
.forPath("/background/path", "data".getBytes());
// Lambda-style callback
client.getData()
.inBackground((curatorFramework, curatorEvent) -> {
if (curatorEvent.getResultCode() == 0) {
byte[] data = curatorEvent.getData();
System.out.println("Data retrieved: " + new String(data));
}
})
.forPath("/some/path");Connection state tracking and error handling for robust distributed application development.
/**
* Returns the listenable interface for the Connect State
* @return listenable
*/
Listenable<ConnectionStateListener> getConnectionStateListenable();
/**
* Return the configured error policy
* @return error policy
*/
ConnectionStateErrorPolicy getConnectionStateErrorPolicy();
/**
* Represents connection states to ZooKeeper
*/
public enum ConnectionState {
/**
* Sent for the first successful connection to the server
*/
CONNECTED,
/**
* There has been a loss of connection. Leaders, locks, etc. should suspend
* until the connection is re-established
*/
SUSPENDED,
/**
* A suspended or lost connection has been re-established
*/
RECONNECTED,
/**
* The connection is confirmed to be lost. Close any locks, leaders, etc. and
* attempt to re-create them
*/
LOST,
/**
* The connection has gone into read-only mode. This can only happen if you pass true
* for canBeReadOnly() in the builder
*/
READ_ONLY;
/**
* Check if this state represents a connection
* @return true if connected
*/
public boolean isConnected() {
return (this == CONNECTED) || (this == RECONNECTED) || (this == READ_ONLY);
}
}
/**
* Listener for connection state changes
*/
public interface ConnectionStateListener {
/**
* Called when the connection state changes
* @param client the client
* @param newState the new state
*/
void stateChanged(CuratorFramework client, ConnectionState newState);
}
/**
* Policy for handling connection errors
*/
public interface ConnectionStateErrorPolicy {
/**
* Return true if this the given state represents an error
* @param state the state
* @return true/false
*/
boolean isErrorState(ConnectionState state);
/**
* Return the timeout to use when checking error states
* @return timeout in milliseconds
*/
int getErrorThresholdMs();
}
/**
* Default error policy implementation
*/
public class StandardConnectionStateErrorPolicy implements ConnectionStateErrorPolicy {
// Default implementation considers SUSPENDED and LOST as error states
}
/**
* Session-based error policy implementation
*/
public class SessionConnectionStateErrorPolicy implements ConnectionStateErrorPolicy {
// Session-based error handling with different thresholds
}Usage Examples:
// Add connection state listener
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("Connection state changed to: " + newState);
switch (newState) {
case CONNECTED:
System.out.println("Connected to ZooKeeper");
// Initialize application state
break;
case SUSPENDED:
System.out.println("Connection suspended - pausing operations");
// Pause non-critical operations
break;
case RECONNECTED:
System.out.println("Reconnected to ZooKeeper");
// Resume operations, refresh state if needed
break;
case LOST:
System.out.println("Connection lost - cleaning up");
// Clean up ephemeral nodes, release locks, etc.
break;
case READ_ONLY:
System.out.println("In read-only mode");
// Only perform read operations
break;
}
}
});
// Use custom executor for connection state callbacks
Executor customExecutor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "ConnectionStateHandler");
t.setDaemon(true);
return t;
});
client.getConnectionStateListenable().addListener(connectionStateListener, customExecutor);
// Check current connection state
ConnectionState currentState = client.getZookeeperClient().getZooKeeper().getState();
System.out.println("Current ZK state: " + currentState);
// Wait for connection
if (!client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
throw new RuntimeException("Failed to connect to ZooKeeper");
}Advanced connection state management with circuit breaking and custom listener factories.
/**
* Factory for creating connection state listener managers
*/
public class ConnectionStateListenerManagerFactory {
/**
* Standard factory instance
*/
public static final ConnectionStateListenerManagerFactory standard;
/**
* Create a circuit breaking factory
* @param retryPolicy retry policy for circuit breaking
* @return circuit breaking factory
*/
public static ConnectionStateListenerManagerFactory circuitBreaking(RetryPolicy retryPolicy);
}
/**
* Circuit breaker for connection management
*/
public interface CircuitBreaker {
/**
* Check if the circuit is open (failing)
* @return true if circuit is open
*/
boolean isOpen();
/**
* Get the current retry count
* @return retry count
*/
int getRetryCount();
}
/**
* Connection state listener with circuit breaking capability
*/
public class CircuitBreakingConnectionStateListener implements ConnectionStateListener {
/**
* Get the circuit breaker instance
* @return circuit breaker
*/
public CircuitBreaker getCircuitBreaker();
}
/**
* Manager for circuit breaking functionality
*/
public class CircuitBreakingManager {
/**
* Check if circuit breaking is active
* @return true if active
*/
public boolean isActive();
/**
* Get the current circuit breaker
* @return circuit breaker or null
*/
public CircuitBreaker getCircuitBreaker();
}Usage Examples:
// Configure client with circuit breaking connection state management
CuratorFramework clientWithCircuitBreaker = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.connectionStateListenerManagerFactory(
ConnectionStateListenerManagerFactory.circuitBreaking(
new ExponentialBackoffRetry(1000, 5)
)
)
.build();
// Custom error policy
ConnectionStateErrorPolicy customPolicy = new ConnectionStateErrorPolicy() {
@Override
public boolean isErrorState(ConnectionState state) {
return state == ConnectionState.LOST || state == ConnectionState.SUSPENDED;
}
@Override
public int getErrorThresholdMs() {
return 30000; // 30 seconds
}
};
CuratorFramework clientWithCustomPolicy = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.connectionStateErrorPolicy(customPolicy)
.build();Generic interface for managing event listeners with custom executors.
/**
* Generic interface for managing listeners
*/
public interface Listenable<T> {
/**
* Add a listener with the default executor
* @param listener listener to add
*/
void addListener(T listener);
/**
* Add a listener with custom executor
* @param listener listener to add
* @param executor executor for listener callbacks
*/
void addListener(T listener, Executor executor);
/**
* Remove a listener
* @param listener listener to remove
*/
void removeListener(T listener);
}
/**
* Container for listener and its executor
*/
public class ListenerEntry<T> {
/**
* Get the listener
* @return listener
*/
public T getListener();
/**
* Get the executor
* @return executor
*/
public Executor getExecutor();
}Usage Examples:
// Add listeners with custom executors
ExecutorService backgroundExecutor = Executors.newCachedThreadPool();
ExecutorService priorityExecutor = Executors.newSingleThreadExecutor();
// High priority connection state listener
client.getConnectionStateListenable().addListener(criticalConnectionListener, priorityExecutor);
// Background event processing
client.getCuratorListenable().addListener(backgroundEventListener, backgroundExecutor);
// Remove listeners when done
client.getConnectionStateListenable().removeListener(criticalConnectionListener);
client.getCuratorListenable().removeListener(backgroundEventListener);
// Clean up executors
backgroundExecutor.shutdown();
priorityExecutor.shutdown();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-curator--curator-framework