CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-shaded-curator

A shaded JAR library that bundles Apache Curator dependencies for ZooKeeper coordination capabilities within Apache Flink's distributed architecture.

Pending
Overview
Eval results
Files

shared-values.mddocs/

Shared Values

Shared value capabilities for coordinating counters and arbitrary data across multiple processes. Provides thread-safe shared state management with versioning and change notifications.

Capabilities

SharedCount

Thread-safe shared integer counter that multiple processes can read and modify atomically.

/**
 * Manages a shared integer that can be safely updated by multiple processes
 */
public class SharedCount implements SharedCountReader, Closeable {
    /**
     * Create a new SharedCount
     * @param client the curator client
     * @param path the path to use for the shared count
     * @param seedValue initial value for the count
     */
    public SharedCount(CuratorFramework client, String path, int seedValue);
    
    /**
     * Start the shared count management
     * @throws Exception if startup fails
     */
    public void start() throws Exception;
    
    /**
     * Close the shared count
     */
    @Override
    public void close() throws IOException;
    
    /**
     * Get the current count value
     * @return current count value
     */
    @Override
    public int getCount();
    
    /**
     * Get the current version of the count
     * @return version number for optimistic locking
     */
    @Override
    public VersionedValue<Integer> getVersionedValue();
    
    /**
     * Add a listener for count changes
     * @param listener the listener to add
     */
    @Override
    public void addListener(SharedCountListener listener);
    
    /**
     * Add listener with specific executor
     * @param listener the listener to add
     * @param executor executor for listener callbacks
     */
    @Override
    public void addListener(SharedCountListener listener, Executor executor);
    
    /**
     * Remove a listener
     * @param listener the listener to remove
     */
    @Override
    public void removeListener(SharedCountListener listener);
    
    /**
     * Set the count to a new value
     * @param newCount the new count value
     * @return true if the set succeeded
     * @throws Exception if operation fails
     */
    public boolean setCount(int newCount) throws Exception;
    
    /**
     * Try to set the count with version checking
     * @param newCount the new count value
     * @param expectedVersion expected current version
     * @return true if the set succeeded
     * @throws Exception if operation fails
     */
    public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;
}

Usage Example:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.framework.state.ConnectionState;

CuratorFramework client = // ... initialize client
SharedCount sharedCounter = new SharedCount(client, "/app/counters/global", 0);

// Add listener for count changes
sharedCounter.addListener(new SharedCountListener() {
    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter changed to: " + newCount);
    }
    
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        System.out.println("Connection state changed: " + newState);
    }
});

try {
    sharedCounter.start();
    
    // Read current value
    int currentValue = sharedCounter.getCount();
    System.out.println("Current counter value: " + currentValue);
    
    // Increment counter atomically
    VersionedValue<Integer> versionedValue = sharedCounter.getVersionedValue();
    boolean success = sharedCounter.trySetCount(versionedValue, versionedValue.getValue() + 1);
    
    if (success) {
        System.out.println("Successfully incremented counter");
    } else {
        System.out.println("Counter was modified by another process, retry needed");
    }
    
    // Force set to specific value
    sharedCounter.setCount(100);
    
} finally {
    sharedCounter.close();
}

SharedValue

Thread-safe shared arbitrary value that multiple processes can read and modify atomically.

/**
 * Manages a shared arbitrary value that can be safely updated by multiple processes
 */
public class SharedValue implements SharedValueReader, Closeable {
    /**
     * Create a new SharedValue
     * @param client the curator client
     * @param path the path to use for the shared value
     * @param seedValue initial value (byte array)
     */
    public SharedValue(CuratorFramework client, String path, byte[] seedValue);
    
    /**
     * Start the shared value management
     * @throws Exception if startup fails
     */
    public void start() throws Exception;
    
    /**
     * Close the shared value
     */
    @Override
    public void close() throws IOException;
    
    /**
     * Get the current value
     * @return current value as byte array
     */
    @Override
    public byte[] getValue();
    
    /**
     * Get the current versioned value
     * @return versioned value for optimistic locking
     */
    @Override
    public VersionedValue<byte[]> getVersionedValue();
    
    /**
     * Add a listener for value changes
     * @param listener the listener to add
     */
    @Override
    public void addListener(SharedValueListener listener);
    
    /**
     * Add listener with specific executor
     * @param listener the listener to add
     * @param executor executor for listener callbacks
     */
    @Override
    public void addListener(SharedValueListener listener, Executor executor);
    
    /**
     * Remove a listener
     * @param listener the listener to remove
     */
    @Override
    public void removeListener(SharedValueListener listener);
    
    /**
     * Set the value
     * @param newValue the new value
     * @return true if the set succeeded
     * @throws Exception if operation fails
     */
    public boolean setValue(byte[] newValue) throws Exception;
    
    /**
     * Try to set the value with version checking
     * @param previous the previous versioned value
     * @param newValue the new value
     * @return true if the set succeeded
     * @throws Exception if operation fails
     */
    public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception;
}

Usage Example:

import org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.curator.framework.recipes.shared.SharedValueListener;
import com.fasterxml.jackson.databind.ObjectMapper;

CuratorFramework client = // ... initialize client
ObjectMapper mapper = new ObjectMapper();

// Initialize with JSON configuration
Map<String, Object> initialConfig = new HashMap<>();
initialConfig.put("maxConnections", 100);
initialConfig.put("timeout", 30);
byte[] seedData = mapper.writeValueAsBytes(initialConfig);

SharedValue sharedConfig = new SharedValue(client, "/app/config/database", seedData);

// Add listener for configuration changes
sharedConfig.addListener(new SharedValueListener() {
    @Override
    public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception {
        Map<String, Object> config = mapper.readValue(newValue, Map.class);
        System.out.println("Configuration updated: " + config);
        // Apply new configuration
        applyConfiguration(config);
    }
    
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        System.out.println("Connection state: " + newState);
    }
});

try {
    sharedConfig.start();
    
    // Read current configuration
    byte[] currentValue = sharedConfig.getValue();
    Map<String, Object> currentConfig = mapper.readValue(currentValue, Map.class);
    System.out.println("Current config: " + currentConfig);
    
    // Update configuration atomically
    VersionedValue<byte[]> versionedValue = sharedConfig.getVersionedValue();
    Map<String, Object> newConfig = new HashMap<>(currentConfig);
    newConfig.put("maxConnections", 200);
    
    byte[] newData = mapper.writeValueAsBytes(newConfig);
    boolean success = sharedConfig.trySetValue(versionedValue, newData);
    
    if (success) {
        System.out.println("Configuration updated successfully");
    } else {
        System.out.println("Configuration was modified by another process");
    }
    
} finally {
    sharedConfig.close();
}

VersionedValue

Container for values with version information for optimistic locking.

/**
 * POJO for holding a value along with its ZooKeeper version
 */
public class VersionedValue<T> {
    /**
     * Create a new VersionedValue
     * @param value the value
     * @param version the version number
     */
    public VersionedValue(T value, int version);
    
    /**
     * Get the value
     * @return the value
     */
    public T getValue();
    
    /**
     * Get the version
     * @return version number
     */
    public int getVersion();
}

SharedCountReader

Read-only interface for SharedCount with listener support.

/**
 * Interface for reading shared count values and listening to changes
 */
public interface SharedCountReader {
    /**
     * Get the current count
     * @return current count value
     */
    int getCount();
    
    /**
     * Get the versioned count value
     * @return versioned value
     */
    VersionedValue<Integer> getVersionedValue();
    
    /**
     * Add a listener for count changes
     * @param listener the listener to add
     */
    void addListener(SharedCountListener listener);
    
    /**
     * Add listener with executor
     * @param listener the listener to add
     * @param executor executor for callbacks
     */
    void addListener(SharedCountListener listener, Executor executor);
    
    /**
     * Remove a listener
     * @param listener the listener to remove
     */
    void removeListener(SharedCountListener listener);
}

SharedValueReader

Read-only interface for SharedValue with listener support.

/**
 * Interface for reading shared values and listening to changes
 */
public interface SharedValueReader {
    /**
     * Get the current value
     * @return current value as byte array
     */
    byte[] getValue();
    
    /**
     * Get the versioned value
     * @return versioned value
     */
    VersionedValue<byte[]> getVersionedValue();
    
    /**
     * Add a listener for value changes
     * @param listener the listener to add
     */
    void addListener(SharedValueListener listener);
    
    /**
     * Add listener with executor
     * @param listener the listener to add
     * @param executor executor for callbacks
     */
    void addListener(SharedValueListener listener, Executor executor);
    
    /**
     * Remove a listener
     * @param listener the listener to remove
     */
    void removeListener(SharedValueListener listener);
}

SharedCountListener

Listener interface for SharedCount change notifications.

/**
 * Listener for SharedCount change notifications
 */
public interface SharedCountListener extends ConnectionStateListener {
    /**
     * Called when the shared count value changes
     * @param sharedCount the SharedCountReader that changed
     * @param newCount the new count value
     * @throws Exception if error occurs in listener
     */
    void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception;
}

SharedValueListener

Listener interface for SharedValue change notifications.

/**
 * Listener for SharedValue change notifications
 */
public interface SharedValueListener extends ConnectionStateListener {
    /**
     * Called when the shared value changes
     * @param sharedValue the SharedValueReader that changed
     * @param newValue the new value
     * @throws Exception if error occurs in listener
     */
    void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception;
}

IllegalTrySetVersionException

Exception thrown when version overflow occurs in versioned operations.

/**
 * Exception thrown when a version number overflows during trySet operations
 */
public class IllegalTrySetVersionException extends Exception {
    /**
     * Create a new IllegalTrySetVersionException
     */
    public IllegalTrySetVersionException();
    
    /**
     * Create with message
     * @param message exception message
     */
    public IllegalTrySetVersionException(String message);
    
    /**
     * Create with cause
     * @param cause underlying cause
     */
    public IllegalTrySetVersionException(Throwable cause);
    
    /**
     * Create with message and cause
     * @param message exception message
     * @param cause underlying cause
     */
    public IllegalTrySetVersionException(String message, Throwable cause);
}

Common Patterns

Configuration Management

public class DistributedConfigManager {
    private final SharedValue configValue;
    private final ObjectMapper mapper;
    private volatile Map<String, Object> currentConfig;
    
    public DistributedConfigManager(CuratorFramework client, String configPath, 
                                   Map<String, Object> defaultConfig) throws Exception {
        this.mapper = new ObjectMapper();
        byte[] defaultData = mapper.writeValueAsBytes(defaultConfig);
        
        this.configValue = new SharedValue(client, configPath, defaultData);
        
        configValue.addListener(new SharedValueListener() {
            @Override
            public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception {
                currentConfig = mapper.readValue(newValue, Map.class);
                onConfigurationChanged(currentConfig);
            }
            
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                // Handle connection state changes
            }
        });
        
        configValue.start();
        
        // Initialize current config
        byte[] currentValue = configValue.getValue();
        this.currentConfig = mapper.readValue(currentValue, Map.class);
    }
    
    public Map<String, Object> getConfiguration() {
        return new HashMap<>(currentConfig);
    }
    
    public boolean updateConfiguration(Map<String, Object> updates) throws Exception {
        VersionedValue<byte[]> current = configValue.getVersionedValue();
        
        Map<String, Object> newConfig = new HashMap<>(
            mapper.readValue(current.getValue(), Map.class)
        );
        newConfig.putAll(updates);
        
        byte[] newData = mapper.writeValueAsBytes(newConfig);
        return configValue.trySetValue(current, newData);
    }
    
    protected void onConfigurationChanged(Map<String, Object> newConfig) {
        System.out.println("Configuration changed: " + newConfig);
        // Implement configuration application logic
    }
    
    public void close() throws IOException {
        configValue.close();
    }
}

Distributed Statistics

public class DistributedStatsCollector implements Closeable {
    private final Map<String, SharedCount> counters;
    private final CuratorFramework client;
    private final String basePath;
    
    public DistributedStatsCollector(CuratorFramework client, String basePath) {
        this.client = client;
        this.basePath = basePath;
        this.counters = new ConcurrentHashMap<>();
    }
    
    public void incrementCounter(String counterName) throws Exception {
        SharedCount counter = getOrCreateCounter(counterName);
        
        // Retry loop for optimistic updates
        for (int retry = 0; retry < 5; retry++) {
            VersionedValue<Integer> current = counter.getVersionedValue();
            boolean success = counter.trySetCount(current, current.getValue() + 1);
            
            if (success) {
                break;
            }
            
            // Brief backoff before retry
            Thread.sleep(10 * (retry + 1));
        }
    }
    
    public void addToCounter(String counterName, int delta) throws Exception {
        SharedCount counter = getOrCreateCounter(counterName);
        
        for (int retry = 0; retry < 5; retry++) {
            VersionedValue<Integer> current = counter.getVersionedValue();
            boolean success = counter.trySetCount(current, current.getValue() + delta);
            
            if (success) {
                break;
            }
            
            Thread.sleep(10 * (retry + 1));
        }
    }
    
    public int getCounterValue(String counterName) throws Exception {
        SharedCount counter = getOrCreateCounter(counterName);
        return counter.getCount();
    }
    
    public Map<String, Integer> getAllCounters() throws Exception {
        Map<String, Integer> result = new HashMap<>();
        for (Map.Entry<String, SharedCount> entry : counters.entrySet()) {
            result.put(entry.getKey(), entry.getValue().getCount());
        }
        return result;
    }
    
    private SharedCount getOrCreateCounter(String counterName) throws Exception {
        return counters.computeIfAbsent(counterName, name -> {
            try {
                SharedCount counter = new SharedCount(client, basePath + "/" + name, 0);
                counter.start();
                return counter;
            } catch (Exception e) {
                throw new RuntimeException("Failed to create counter: " + name, e);
            }
        });
    }
    
    @Override
    public void close() throws IOException {
        for (SharedCount counter : counters.values()) {
            counter.close();
        }
        counters.clear();
    }
}

Batch Processing Coordination

public class BatchCoordinator {
    private final DistributedDoubleBarrier barrier;
    private final SharedCount progressCounter;
    private final int expectedParticipants;
    
    public BatchCoordinator(CuratorFramework client, String jobId, int expectedParticipants) 
            throws Exception {
        this.expectedParticipants = expectedParticipants;
        
        this.barrier = new DistributedDoubleBarrier(
            client, 
            "/jobs/" + jobId + "/barrier", 
            expectedParticipants
        );
        
        this.progressCounter = new SharedCount(
            client, 
            "/jobs/" + jobId + "/progress", 
            0
        );
        
        progressCounter.start();
    }
    
    public boolean startBatch(long timeoutSeconds) throws Exception {
        System.out.println("Waiting for all " + expectedParticipants + " participants...");
        
        boolean allReady = barrier.enter(timeoutSeconds, TimeUnit.SECONDS);
        if (!allReady) {
            System.out.println("Not all participants ready within timeout");
            return false;
        }
        
        System.out.println("All participants ready, batch processing started");
        return true;
    }
    
    public void reportProgress() throws Exception {
        // Atomically increment progress
        for (int retry = 0; retry < 3; retry++) {
            VersionedValue<Integer> current = progressCounter.getVersionedValue();
            boolean success = progressCounter.trySetCount(current, current.getValue() + 1);
            
            if (success) {
                int newProgress = current.getValue() + 1;
                System.out.println("Progress: " + newProgress + "/" + expectedParticipants);
                break;
            }
        }
    }
    
    public boolean finishBatch(long timeoutSeconds) throws Exception {
        System.out.println("Waiting for all participants to complete...");
        
        boolean allCompleted = barrier.leave(timeoutSeconds, TimeUnit.SECONDS);
        if (!allCompleted) {
            System.out.println("Not all participants completed within timeout");
            return false;
        }
        
        System.out.println("All participants completed successfully");
        return true;
    }
    
    public int getCurrentProgress() {
        return progressCounter.getCount();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-shaded-curator

docs

caching.md

index.md

leader-election.md

locking.md

shared-values.md

tile.json