A shaded JAR library that bundles Apache Curator dependencies for ZooKeeper coordination capabilities within Apache Flink's distributed architecture.
—
Caching mechanisms for ZooKeeper paths to improve performance and reduce network overhead. Provides various caching strategies including path children caching, tree caching, and node caching with event notifications for cache updates.
Caches the children of a ZooKeeper path and provides notifications when children are added, updated, or removed.
/**
* Caches children of a path and notifies of changes
*/
public class PathChildrenCache implements Closeable {
/**
* Create a new PathChildrenCache
* @param client the curator client
* @param path the path to cache children for
* @param cacheData true to cache node data, false for just paths
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
/**
* Start the cache
* @throws Exception if cache cannot be started
*/
public void start() throws Exception;
/**
* Start the cache with a specific mode
* @param mode how to initialize the cache
* @throws Exception if cache cannot be started
*/
public void start(StartMode mode) throws Exception;
/**
* Get current cached children data
* @return list of current children
*/
public List<ChildData> getCurrentData();
/**
* Get current data for a specific child path
* @param fullPath full path of the child
* @return child data or null if not found
*/
public ChildData getCurrentData(String fullPath);
/**
* Add a listener for cache events
* @param listener the listener to add
*/
public void getListenable().addListener(PathChildrenCacheListener listener);
/**
* Close the cache
*/
public void close() throws IOException;
}
/**
* Start modes for PathChildrenCache
*/
public enum StartMode {
/** Start normally, don't populate cache until first event */
NORMAL,
/** Build initial cache by querying ZooKeeper */
BUILD_INITIAL_CACHE,
/** Build initial cache in background */
POST_INITIALIZED_EVENT
}
/**
* Listener interface for path children cache events
*/
public interface PathChildrenCacheListener {
/**
* Called when a cache event occurs
* @param client the curator client
* @param event the cache event
*/
void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}
/**
* Represents cached data for a child node
*/
public class ChildData {
/**
* Get the full path of this child
*/
public String getPath();
/**
* Get the stat information for this child
*/
public Stat getStat();
/**
* Get the data bytes for this child
*/
public byte[] getData();
}Usage Example:
PathChildrenCache cache = new PathChildrenCache(client, "/app/workers", true);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("Child added: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("Child updated: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("Child removed: " + event.getData().getPath());
break;
}
}
});
cache.start(StartMode.BUILD_INITIAL_CACHE);
// Cache now tracks all children of /app/workers
List<ChildData> currentChildren = cache.getCurrentData();Caches an entire tree of ZooKeeper nodes, providing notifications for any changes within the tree.
/**
* Caches a complete tree of ZooKeeper nodes
*/
public class TreeCache implements Closeable {
/**
* Create a new TreeCache
* @param client the curator client
* @param path the root path to cache
*/
public TreeCache(CuratorFramework client, String path);
/**
* Start the cache
* @throws Exception if cache cannot be started
*/
public void start() throws Exception;
/**
* Get current children at a specific path
* @param fullPath the path to get children for
* @return map of child name to child data
*/
public Map<String, ChildData> getCurrentChildren(String fullPath);
/**
* Get current data for a specific path
* @param fullPath the path to get data for
* @return child data or null if not found
*/
public ChildData getCurrentData(String fullPath);
/**
* Add a listener for cache events
* @param listener the listener to add
*/
public void getListenable().addListener(TreeCacheListener listener);
/**
* Close the cache
*/
public void close() throws IOException;
}
/**
* Listener interface for tree cache events
*/
public interface TreeCacheListener {
/**
* Called when a tree event occurs
* @param client the curator client
* @param event the tree event
*/
void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception;
}Caches data for a single ZooKeeper node and provides notifications when the node changes.
/**
* Caches data for a single node
*/
public class NodeCache implements Closeable {
/**
* Create a new NodeCache
* @param client the curator client
* @param path the path to cache
* @param dataIsCompressed true if node data is compressed
*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);
/**
* Start the cache
* @throws Exception if cache cannot be started
*/
public void start() throws Exception;
/**
* Start the cache with initial data loading
* @param buildInitial true to load initial data
* @throws Exception if cache cannot be started
*/
public void start(boolean buildInitial) throws Exception;
/**
* Get current cached data
* @return current node data or null if not cached
*/
public ChildData getCurrentData();
/**
* Add a listener for node changes
* @param listener the listener to add
*/
public void getListenable().addListener(NodeCacheListener listener);
/**
* Close the cache
*/
public void close() throws IOException;
}
/**
* Listener interface for node cache events
*/
public interface NodeCacheListener {
/**
* Called when node data changes
*/
void nodeChanged() throws Exception;
}Combined Caching Example:
// Cache a specific configuration node
NodeCache configCache = new NodeCache(client, "/app/config", false);
configCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData data = configCache.getCurrentData();
if (data != null) {
String config = new String(data.getData());
System.out.println("Configuration updated: " + config);
reloadConfiguration(config);
}
}
});
configCache.start(true);
// Cache all service instances
PathChildrenCache serviceCache = new PathChildrenCache(client, "/app/services", true);
serviceCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
String servicePath = event.getData().getPath();
String serviceData = new String(event.getData().getData());
System.out.println("New service registered: " + servicePath + " -> " + serviceData);
registerService(servicePath, serviceData);
}
}
});
serviceCache.start(StartMode.BUILD_INITIAL_CACHE);
// Cache entire application tree
TreeCache appCache = new TreeCache(client, "/app");
appCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
switch (event.getType()) {
case NODE_ADDED:
System.out.println("Node added in app tree: " + event.getData().getPath());
break;
case NODE_UPDATED:
System.out.println("Node updated in app tree: " + event.getData().getPath());
break;
case NODE_REMOVED:
System.out.println("Node removed from app tree: " + event.getData().getPath());
break;
}
}
});
appCache.start();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-shaded-curator