A shaded JAR library that bundles Apache Curator dependencies for ZooKeeper coordination capabilities within Apache Flink's distributed architecture.
—
Shared value capabilities for coordinating counters and arbitrary data across multiple processes. Provides thread-safe shared state management with versioning and change notifications.
Thread-safe shared integer counter that multiple processes can read and modify atomically.
/**
* Manages a shared integer that can be safely updated by multiple processes
*/
public class SharedCount implements SharedCountReader, Closeable {
/**
* Create a new SharedCount
* @param client the curator client
* @param path the path to use for the shared count
* @param seedValue initial value for the count
*/
public SharedCount(CuratorFramework client, String path, int seedValue);
/**
* Start the shared count management
* @throws Exception if startup fails
*/
public void start() throws Exception;
/**
* Close the shared count
*/
@Override
public void close() throws IOException;
/**
* Get the current count value
* @return current count value
*/
@Override
public int getCount();
/**
* Get the current version of the count
* @return version number for optimistic locking
*/
@Override
public VersionedValue<Integer> getVersionedValue();
/**
* Add a listener for count changes
* @param listener the listener to add
*/
@Override
public void addListener(SharedCountListener listener);
/**
* Add listener with specific executor
* @param listener the listener to add
* @param executor executor for listener callbacks
*/
@Override
public void addListener(SharedCountListener listener, Executor executor);
/**
* Remove a listener
* @param listener the listener to remove
*/
@Override
public void removeListener(SharedCountListener listener);
/**
* Set the count to a new value
* @param newCount the new count value
* @return true if the set succeeded
* @throws Exception if operation fails
*/
public boolean setCount(int newCount) throws Exception;
/**
* Try to set the count with version checking
* @param newCount the new count value
* @param expectedVersion expected current version
* @return true if the set succeeded
* @throws Exception if operation fails
*/
public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;
}Usage Example:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.framework.state.ConnectionState;
CuratorFramework client = // ... initialize client
SharedCount sharedCounter = new SharedCount(client, "/app/counters/global", 0);
// Add listener for count changes
sharedCounter.addListener(new SharedCountListener() {
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("Counter changed to: " + newCount);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("Connection state changed: " + newState);
}
});
try {
sharedCounter.start();
// Read current value
int currentValue = sharedCounter.getCount();
System.out.println("Current counter value: " + currentValue);
// Increment counter atomically
VersionedValue<Integer> versionedValue = sharedCounter.getVersionedValue();
boolean success = sharedCounter.trySetCount(versionedValue, versionedValue.getValue() + 1);
if (success) {
System.out.println("Successfully incremented counter");
} else {
System.out.println("Counter was modified by another process, retry needed");
}
// Force set to specific value
sharedCounter.setCount(100);
} finally {
sharedCounter.close();
}Thread-safe shared arbitrary value that multiple processes can read and modify atomically.
/**
* Manages a shared arbitrary value that can be safely updated by multiple processes
*/
public class SharedValue implements SharedValueReader, Closeable {
/**
* Create a new SharedValue
* @param client the curator client
* @param path the path to use for the shared value
* @param seedValue initial value (byte array)
*/
public SharedValue(CuratorFramework client, String path, byte[] seedValue);
/**
* Start the shared value management
* @throws Exception if startup fails
*/
public void start() throws Exception;
/**
* Close the shared value
*/
@Override
public void close() throws IOException;
/**
* Get the current value
* @return current value as byte array
*/
@Override
public byte[] getValue();
/**
* Get the current versioned value
* @return versioned value for optimistic locking
*/
@Override
public VersionedValue<byte[]> getVersionedValue();
/**
* Add a listener for value changes
* @param listener the listener to add
*/
@Override
public void addListener(SharedValueListener listener);
/**
* Add listener with specific executor
* @param listener the listener to add
* @param executor executor for listener callbacks
*/
@Override
public void addListener(SharedValueListener listener, Executor executor);
/**
* Remove a listener
* @param listener the listener to remove
*/
@Override
public void removeListener(SharedValueListener listener);
/**
* Set the value
* @param newValue the new value
* @return true if the set succeeded
* @throws Exception if operation fails
*/
public boolean setValue(byte[] newValue) throws Exception;
/**
* Try to set the value with version checking
* @param previous the previous versioned value
* @param newValue the new value
* @return true if the set succeeded
* @throws Exception if operation fails
*/
public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception;
}Usage Example:
import org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.curator.framework.recipes.shared.SharedValueListener;
import com.fasterxml.jackson.databind.ObjectMapper;
CuratorFramework client = // ... initialize client
ObjectMapper mapper = new ObjectMapper();
// Initialize with JSON configuration
Map<String, Object> initialConfig = new HashMap<>();
initialConfig.put("maxConnections", 100);
initialConfig.put("timeout", 30);
byte[] seedData = mapper.writeValueAsBytes(initialConfig);
SharedValue sharedConfig = new SharedValue(client, "/app/config/database", seedData);
// Add listener for configuration changes
sharedConfig.addListener(new SharedValueListener() {
@Override
public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception {
Map<String, Object> config = mapper.readValue(newValue, Map.class);
System.out.println("Configuration updated: " + config);
// Apply new configuration
applyConfiguration(config);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("Connection state: " + newState);
}
});
try {
sharedConfig.start();
// Read current configuration
byte[] currentValue = sharedConfig.getValue();
Map<String, Object> currentConfig = mapper.readValue(currentValue, Map.class);
System.out.println("Current config: " + currentConfig);
// Update configuration atomically
VersionedValue<byte[]> versionedValue = sharedConfig.getVersionedValue();
Map<String, Object> newConfig = new HashMap<>(currentConfig);
newConfig.put("maxConnections", 200);
byte[] newData = mapper.writeValueAsBytes(newConfig);
boolean success = sharedConfig.trySetValue(versionedValue, newData);
if (success) {
System.out.println("Configuration updated successfully");
} else {
System.out.println("Configuration was modified by another process");
}
} finally {
sharedConfig.close();
}Container for values with version information for optimistic locking.
/**
* POJO for holding a value along with its ZooKeeper version
*/
public class VersionedValue<T> {
/**
* Create a new VersionedValue
* @param value the value
* @param version the version number
*/
public VersionedValue(T value, int version);
/**
* Get the value
* @return the value
*/
public T getValue();
/**
* Get the version
* @return version number
*/
public int getVersion();
}Read-only interface for SharedCount with listener support.
/**
* Interface for reading shared count values and listening to changes
*/
public interface SharedCountReader {
/**
* Get the current count
* @return current count value
*/
int getCount();
/**
* Get the versioned count value
* @return versioned value
*/
VersionedValue<Integer> getVersionedValue();
/**
* Add a listener for count changes
* @param listener the listener to add
*/
void addListener(SharedCountListener listener);
/**
* Add listener with executor
* @param listener the listener to add
* @param executor executor for callbacks
*/
void addListener(SharedCountListener listener, Executor executor);
/**
* Remove a listener
* @param listener the listener to remove
*/
void removeListener(SharedCountListener listener);
}Read-only interface for SharedValue with listener support.
/**
* Interface for reading shared values and listening to changes
*/
public interface SharedValueReader {
/**
* Get the current value
* @return current value as byte array
*/
byte[] getValue();
/**
* Get the versioned value
* @return versioned value
*/
VersionedValue<byte[]> getVersionedValue();
/**
* Add a listener for value changes
* @param listener the listener to add
*/
void addListener(SharedValueListener listener);
/**
* Add listener with executor
* @param listener the listener to add
* @param executor executor for callbacks
*/
void addListener(SharedValueListener listener, Executor executor);
/**
* Remove a listener
* @param listener the listener to remove
*/
void removeListener(SharedValueListener listener);
}Listener interface for SharedCount change notifications.
/**
* Listener for SharedCount change notifications
*/
public interface SharedCountListener extends ConnectionStateListener {
/**
* Called when the shared count value changes
* @param sharedCount the SharedCountReader that changed
* @param newCount the new count value
* @throws Exception if error occurs in listener
*/
void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception;
}Listener interface for SharedValue change notifications.
/**
* Listener for SharedValue change notifications
*/
public interface SharedValueListener extends ConnectionStateListener {
/**
* Called when the shared value changes
* @param sharedValue the SharedValueReader that changed
* @param newValue the new value
* @throws Exception if error occurs in listener
*/
void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception;
}Exception thrown when version overflow occurs in versioned operations.
/**
* Exception thrown when a version number overflows during trySet operations
*/
public class IllegalTrySetVersionException extends Exception {
/**
* Create a new IllegalTrySetVersionException
*/
public IllegalTrySetVersionException();
/**
* Create with message
* @param message exception message
*/
public IllegalTrySetVersionException(String message);
/**
* Create with cause
* @param cause underlying cause
*/
public IllegalTrySetVersionException(Throwable cause);
/**
* Create with message and cause
* @param message exception message
* @param cause underlying cause
*/
public IllegalTrySetVersionException(String message, Throwable cause);
}public class DistributedConfigManager {
private final SharedValue configValue;
private final ObjectMapper mapper;
private volatile Map<String, Object> currentConfig;
public DistributedConfigManager(CuratorFramework client, String configPath,
Map<String, Object> defaultConfig) throws Exception {
this.mapper = new ObjectMapper();
byte[] defaultData = mapper.writeValueAsBytes(defaultConfig);
this.configValue = new SharedValue(client, configPath, defaultData);
configValue.addListener(new SharedValueListener() {
@Override
public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception {
currentConfig = mapper.readValue(newValue, Map.class);
onConfigurationChanged(currentConfig);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
// Handle connection state changes
}
});
configValue.start();
// Initialize current config
byte[] currentValue = configValue.getValue();
this.currentConfig = mapper.readValue(currentValue, Map.class);
}
public Map<String, Object> getConfiguration() {
return new HashMap<>(currentConfig);
}
public boolean updateConfiguration(Map<String, Object> updates) throws Exception {
VersionedValue<byte[]> current = configValue.getVersionedValue();
Map<String, Object> newConfig = new HashMap<>(
mapper.readValue(current.getValue(), Map.class)
);
newConfig.putAll(updates);
byte[] newData = mapper.writeValueAsBytes(newConfig);
return configValue.trySetValue(current, newData);
}
protected void onConfigurationChanged(Map<String, Object> newConfig) {
System.out.println("Configuration changed: " + newConfig);
// Implement configuration application logic
}
public void close() throws IOException {
configValue.close();
}
}public class DistributedStatsCollector implements Closeable {
private final Map<String, SharedCount> counters;
private final CuratorFramework client;
private final String basePath;
public DistributedStatsCollector(CuratorFramework client, String basePath) {
this.client = client;
this.basePath = basePath;
this.counters = new ConcurrentHashMap<>();
}
public void incrementCounter(String counterName) throws Exception {
SharedCount counter = getOrCreateCounter(counterName);
// Retry loop for optimistic updates
for (int retry = 0; retry < 5; retry++) {
VersionedValue<Integer> current = counter.getVersionedValue();
boolean success = counter.trySetCount(current, current.getValue() + 1);
if (success) {
break;
}
// Brief backoff before retry
Thread.sleep(10 * (retry + 1));
}
}
public void addToCounter(String counterName, int delta) throws Exception {
SharedCount counter = getOrCreateCounter(counterName);
for (int retry = 0; retry < 5; retry++) {
VersionedValue<Integer> current = counter.getVersionedValue();
boolean success = counter.trySetCount(current, current.getValue() + delta);
if (success) {
break;
}
Thread.sleep(10 * (retry + 1));
}
}
public int getCounterValue(String counterName) throws Exception {
SharedCount counter = getOrCreateCounter(counterName);
return counter.getCount();
}
public Map<String, Integer> getAllCounters() throws Exception {
Map<String, Integer> result = new HashMap<>();
for (Map.Entry<String, SharedCount> entry : counters.entrySet()) {
result.put(entry.getKey(), entry.getValue().getCount());
}
return result;
}
private SharedCount getOrCreateCounter(String counterName) throws Exception {
return counters.computeIfAbsent(counterName, name -> {
try {
SharedCount counter = new SharedCount(client, basePath + "/" + name, 0);
counter.start();
return counter;
} catch (Exception e) {
throw new RuntimeException("Failed to create counter: " + name, e);
}
});
}
@Override
public void close() throws IOException {
for (SharedCount counter : counters.values()) {
counter.close();
}
counters.clear();
}
}public class BatchCoordinator {
private final DistributedDoubleBarrier barrier;
private final SharedCount progressCounter;
private final int expectedParticipants;
public BatchCoordinator(CuratorFramework client, String jobId, int expectedParticipants)
throws Exception {
this.expectedParticipants = expectedParticipants;
this.barrier = new DistributedDoubleBarrier(
client,
"/jobs/" + jobId + "/barrier",
expectedParticipants
);
this.progressCounter = new SharedCount(
client,
"/jobs/" + jobId + "/progress",
0
);
progressCounter.start();
}
public boolean startBatch(long timeoutSeconds) throws Exception {
System.out.println("Waiting for all " + expectedParticipants + " participants...");
boolean allReady = barrier.enter(timeoutSeconds, TimeUnit.SECONDS);
if (!allReady) {
System.out.println("Not all participants ready within timeout");
return false;
}
System.out.println("All participants ready, batch processing started");
return true;
}
public void reportProgress() throws Exception {
// Atomically increment progress
for (int retry = 0; retry < 3; retry++) {
VersionedValue<Integer> current = progressCounter.getVersionedValue();
boolean success = progressCounter.trySetCount(current, current.getValue() + 1);
if (success) {
int newProgress = current.getValue() + 1;
System.out.println("Progress: " + newProgress + "/" + expectedParticipants);
break;
}
}
}
public boolean finishBatch(long timeoutSeconds) throws Exception {
System.out.println("Waiting for all participants to complete...");
boolean allCompleted = barrier.leave(timeoutSeconds, TimeUnit.SECONDS);
if (!allCompleted) {
System.out.println("Not all participants completed within timeout");
return false;
}
System.out.println("All participants completed successfully");
return true;
}
public int getCurrentProgress() {
return progressCounter.getCount();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-shaded-curator