A shaded JAR library that bundles Apache Curator dependencies for ZooKeeper coordination capabilities within Apache Flink's distributed architecture.
—
Leader election capabilities for coordinating leadership across multiple processes in distributed systems. Provides two different approaches: LeaderLatch for simple leader selection and LeaderSelector for more complex leadership management with listener-based control.
Simple leader election mechanism where participating processes compete to become the leader. Uses a latch-based approach with automatic failover.
/**
* Leader selection abstraction that handles connection state and provides
* reliable leader election across multiple processes
*/
public class LeaderLatch implements Closeable {
/**
* Create a new LeaderLatch
* @param client the curator client
* @param latchPath the path to use for leader election
*/
public LeaderLatch(CuratorFramework client, String latchPath);
/**
* Create a new LeaderLatch with participant ID
* @param client the curator client
* @param latchPath the path to use for leader election
* @param id unique identifier for this participant
*/
public LeaderLatch(CuratorFramework client, String latchPath, String id);
/**
* Create a new LeaderLatch with close mode
* @param client the curator client
* @param latchPath the path to use for leader election
* @param id unique identifier for this participant
* @param closeMode how to handle listeners on close
*/
public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode);
/**
* Start the leader selection process
*/
public void start() throws Exception;
/**
* Close the latch and relinquish leadership
*/
public void close() throws IOException;
/**
* Check if this instance is currently the leader
* @return true if this instance is the leader
*/
public boolean hasLeadership();
/**
* Wait until this instance becomes the leader
*/
public void await() throws InterruptedException, EOFException;
/**
* Wait until this instance becomes the leader or timeout
* @param timeout maximum time to wait
* @param unit time unit
* @return true if became leader, false if timed out
*/
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Get the current state of this latch
* @return current state
*/
public State getState();
/**
* Get the ID of this participant
* @return participant ID
*/
public String getId();
/**
* Get all current participants in the election
* @return collection of all participants
*/
public Collection<Participant> getParticipants() throws Exception;
/**
* Get the current leader participant
* @return leader participant, or null if no leader
*/
public Participant getLeader() throws Exception;
/**
* Add a listener for latch state changes
* @param listener the listener to add
*/
public void addListener(LeaderLatchListener listener);
/**
* Add a listener with executor
* @param listener the listener to add
* @param executor executor for listener callbacks
*/
public void addListener(LeaderLatchListener listener, Executor executor);
/**
* Remove a listener
* @param listener the listener to remove
*/
public void removeListener(LeaderLatchListener listener);
/**
* Enumeration of possible latch states
*/
public enum State {
LATENT, // Not started
STARTED, // Started but not leader
CLOSED // Closed
}
/**
* Enumeration controlling listener handling on close
*/
public enum CloseMode {
SILENT, // Don't notify listeners on close
NOTIFY_LEADER // Notify listeners on close
}
}Usage Example:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
CuratorFramework client = // ... initialize client
LeaderLatch leaderLatch = new LeaderLatch(client, "/app/leader", "server-1");
// Add listener for leadership changes
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println("I am now the leader!");
// Start leader-specific tasks
}
@Override
public void notLeader() {
System.out.println("I am no longer the leader");
// Stop leader-specific tasks
}
});
try {
leaderLatch.start();
// Wait to become leader (optional)
if (leaderLatch.await(30, TimeUnit.SECONDS)) {
System.out.println("Became leader within 30 seconds");
}
// Check leadership status
if (leaderLatch.hasLeadership()) {
System.out.println("Currently the leader");
}
// Keep running...
Thread.sleep(60000);
} finally {
leaderLatch.close();
}Alternative leader election implementation providing more control over leadership lifecycle through listener callbacks.
/**
* Alternative leader selection implementation with listener-based control
*/
public class LeaderSelector implements Closeable {
/**
* Create a new LeaderSelector
* @param client the curator client
* @param mutexPath the path to use for leader election
* @param listener listener to handle leadership changes
*/
public LeaderSelector(CuratorFramework client, String mutexPath, LeaderSelectorListener listener);
/**
* Create a new LeaderSelector with participant ID
* @param client the curator client
* @param mutexPath the path to use for leader election
* @param threadFactory factory for creating threads
* @param executor executor for running leader logic
* @param listener listener to handle leadership changes
*/
public LeaderSelector(CuratorFramework client, String mutexPath,
ThreadFactory threadFactory, Executor executor,
LeaderSelectorListener listener);
/**
* Start the leader selection process
*/
public void start();
/**
* Start with immediate participation
*/
public void autoRequeue();
/**
* Close the selector and relinquish leadership
*/
public void close() throws IOException;
/**
* Check if this instance has leadership
* @return true if currently the leader
*/
public boolean hasLeadership();
/**
* Requeue this instance for leadership selection
*/
public void requeue();
/**
* Get the participant ID for this selector
* @return participant ID
*/
public String getId();
/**
* Set the participant ID
* @param id the participant ID to set
*/
public void setId(String id);
/**
* Get all current participants
* @return collection of participants
*/
public Collection<Participant> getParticipants() throws Exception;
/**
* Interrupt current leadership (if this instance is leader)
*/
public void interruptLeadership();
}Usage Example:
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
CuratorFramework client = // ... initialize client
LeaderSelector leaderSelector = new LeaderSelector(client, "/app/leader",
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("Taking leadership...");
try {
// This method should not return until leadership is relinquished
while (true) {
// Perform leader duties
System.out.println("Doing leader work...");
Thread.sleep(5000);
// Check if we should give up leadership
if (shouldGiveUpLeadership()) {
break;
}
}
} finally {
System.out.println("Giving up leadership");
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
super.stateChanged(client, newState);
if (newState == ConnectionState.LOST) {
// Connection lost - leadership will be automatically relinquished
handleConnectionLoss();
}
}
});
leaderSelector.setId("server-1");
leaderSelector.start();
leaderSelector.autoRequeue(); // Automatically requeue for leadership
try {
// Keep running...
Thread.sleep(300000);
} finally {
leaderSelector.close();
}Represents a participant in leader election, providing information about each process competing for leadership.
/**
* Information about a participant in leader election
*/
public class Participant {
/**
* Get the participant's unique ID
* @return participant ID
*/
public String getId();
/**
* Check if this participant is currently the leader
* @return true if this is the current leader
*/
public boolean isLeader();
}Listener interface for receiving notifications about LeaderLatch state changes.
/**
* Listener for LeaderLatch state change notifications
*/
public interface LeaderLatchListener {
/**
* Called when this instance becomes the leader
*/
void isLeader();
/**
* Called when this instance loses leadership
*/
void notLeader();
}Listener interface for LeaderSelector leadership management.
/**
* Listener for LeaderSelector leadership events
*/
public interface LeaderSelectorListener extends ConnectionStateListener {
/**
* Called when this instance should take leadership
* This method should not return until leadership should be relinquished
* @param client the curator client
* @throws Exception if an error occurs during leadership
*/
void takeLeadership(CuratorFramework client) throws Exception;
}Recommended base class for LeaderSelectorListener implementations that handles connection state properly.
/**
* Recommended base class for LeaderSelectorListener that handles
* connection state changes appropriately
*/
public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
/**
* Default implementation that handles standard connection state changes
* @param client the curator client
* @param newState the new connection state
*/
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState);
/**
* Subclasses must implement this to handle leadership
* @param client the curator client
*/
@Override
public abstract void takeLeadership(CuratorFramework client) throws Exception;
}Exception that can be thrown to interrupt leadership during connection changes.
/**
* Exception that can be thrown during takeLeadership() to indicate
* that leadership should be interrupted due to connection issues
*/
public class CancelLeadershipException extends Exception {
/**
* Create a new CancelLeadershipException
*/
public CancelLeadershipException();
/**
* Create a new CancelLeadershipException with message
* @param message exception message
*/
public CancelLeadershipException(String message);
/**
* Create a new CancelLeadershipException with cause
* @param cause the underlying cause
*/
public CancelLeadershipException(Throwable cause);
/**
* Create a new CancelLeadershipException with message and cause
* @param message exception message
* @param cause the underlying cause
*/
public CancelLeadershipException(String message, Throwable cause);
}LeaderLatch latch = new LeaderLatch(client, "/app/leader", "server-1");
AtomicBoolean shouldStop = new AtomicBoolean(false);
latch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
// Start leader tasks
startLeaderTasks();
}
@Override
public void notLeader() {
// Gracefully stop leader tasks
shouldStop.set(true);
stopLeaderTasks();
}
});
latch.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
shouldStop.set(true);
latch.close();
} catch (IOException e) {
// Log error
}
}));LeaderSelector selector = new LeaderSelector(client, "/app/leader",
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
// Perform leader duties
doLeaderWork();
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.SUSPENDED ||
newState == ConnectionState.LOST) {
// Interrupt current leadership
Thread.currentThread().interrupt();
}
}
});Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-shaded-curator