CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-shaded-curator

A shaded JAR library that bundles Apache Curator dependencies for ZooKeeper coordination capabilities within Apache Flink's distributed architecture.

Pending
Overview
Eval results
Files

leader-election.mddocs/

Leader Election

Leader election capabilities for coordinating leadership across multiple processes in distributed systems. Provides two different approaches: LeaderLatch for simple leader selection and LeaderSelector for more complex leadership management with listener-based control.

Capabilities

LeaderLatch

Simple leader election mechanism where participating processes compete to become the leader. Uses a latch-based approach with automatic failover.

/**
 * Leader selection abstraction that handles connection state and provides
 * reliable leader election across multiple processes
 */
public class LeaderLatch implements Closeable {
    /**
     * Create a new LeaderLatch
     * @param client the curator client
     * @param latchPath the path to use for leader election
     */
    public LeaderLatch(CuratorFramework client, String latchPath);
    
    /**
     * Create a new LeaderLatch with participant ID
     * @param client the curator client  
     * @param latchPath the path to use for leader election
     * @param id unique identifier for this participant
     */
    public LeaderLatch(CuratorFramework client, String latchPath, String id);
    
    /**
     * Create a new LeaderLatch with close mode
     * @param client the curator client
     * @param latchPath the path to use for leader election
     * @param id unique identifier for this participant
     * @param closeMode how to handle listeners on close
     */
    public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode);
    
    /**
     * Start the leader selection process
     */
    public void start() throws Exception;
    
    /**
     * Close the latch and relinquish leadership
     */
    public void close() throws IOException;
    
    /**
     * Check if this instance is currently the leader
     * @return true if this instance is the leader
     */
    public boolean hasLeadership();
    
    /**
     * Wait until this instance becomes the leader
     */
    public void await() throws InterruptedException, EOFException;
    
    /**
     * Wait until this instance becomes the leader or timeout
     * @param timeout maximum time to wait
     * @param unit time unit
     * @return true if became leader, false if timed out
     */
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    
    /**
     * Get the current state of this latch
     * @return current state
     */
    public State getState();
    
    /**
     * Get the ID of this participant
     * @return participant ID
     */
    public String getId();
    
    /**
     * Get all current participants in the election
     * @return collection of all participants
     */
    public Collection<Participant> getParticipants() throws Exception;
    
    /**
     * Get the current leader participant
     * @return leader participant, or null if no leader
     */
    public Participant getLeader() throws Exception;
    
    /**
     * Add a listener for latch state changes
     * @param listener the listener to add
     */
    public void addListener(LeaderLatchListener listener);
    
    /**
     * Add a listener with executor
     * @param listener the listener to add
     * @param executor executor for listener callbacks
     */
    public void addListener(LeaderLatchListener listener, Executor executor);
    
    /**
     * Remove a listener
     * @param listener the listener to remove
     */
    public void removeListener(LeaderLatchListener listener);
    
    /**
     * Enumeration of possible latch states
     */
    public enum State {
        LATENT,    // Not started
        STARTED,   // Started but not leader
        CLOSED     // Closed
    }
    
    /**
     * Enumeration controlling listener handling on close
     */
    public enum CloseMode {
        SILENT,           // Don't notify listeners on close
        NOTIFY_LEADER     // Notify listeners on close
    }
}

Usage Example:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;

CuratorFramework client = // ... initialize client
LeaderLatch leaderLatch = new LeaderLatch(client, "/app/leader", "server-1");

// Add listener for leadership changes
leaderLatch.addListener(new LeaderLatchListener() {
    @Override
    public void isLeader() {
        System.out.println("I am now the leader!");
        // Start leader-specific tasks
    }
    
    @Override
    public void notLeader() {
        System.out.println("I am no longer the leader");
        // Stop leader-specific tasks
    }
});

try {
    leaderLatch.start();
    
    // Wait to become leader (optional)
    if (leaderLatch.await(30, TimeUnit.SECONDS)) {
        System.out.println("Became leader within 30 seconds");
    }
    
    // Check leadership status
    if (leaderLatch.hasLeadership()) {
        System.out.println("Currently the leader");
    }
    
    // Keep running...
    Thread.sleep(60000);
    
} finally {
    leaderLatch.close();
}

LeaderSelector

Alternative leader election implementation providing more control over leadership lifecycle through listener callbacks.

/**
 * Alternative leader selection implementation with listener-based control
 */
public class LeaderSelector implements Closeable {
    /**
     * Create a new LeaderSelector
     * @param client the curator client
     * @param mutexPath the path to use for leader election
     * @param listener listener to handle leadership changes
     */
    public LeaderSelector(CuratorFramework client, String mutexPath, LeaderSelectorListener listener);
    
    /**
     * Create a new LeaderSelector with participant ID
     * @param client the curator client
     * @param mutexPath the path to use for leader election
     * @param threadFactory factory for creating threads
     * @param executor executor for running leader logic
     * @param listener listener to handle leadership changes
     */
    public LeaderSelector(CuratorFramework client, String mutexPath, 
                         ThreadFactory threadFactory, Executor executor, 
                         LeaderSelectorListener listener);
    
    /**
     * Start the leader selection process
     */
    public void start();
    
    /**
     * Start with immediate participation
     */
    public void autoRequeue();
    
    /**
     * Close the selector and relinquish leadership
     */
    public void close() throws IOException;
    
    /**
     * Check if this instance has leadership
     * @return true if currently the leader
     */
    public boolean hasLeadership();
    
    /**
     * Requeue this instance for leadership selection
     */
    public void requeue();
    
    /**
     * Get the participant ID for this selector
     * @return participant ID
     */
    public String getId();
    
    /**
     * Set the participant ID
     * @param id the participant ID to set
     */
    public void setId(String id);
    
    /**
     * Get all current participants
     * @return collection of participants
     */
    public Collection<Participant> getParticipants() throws Exception;
    
    /**
     * Interrupt current leadership (if this instance is leader)
     */
    public void interruptLeadership();
}

Usage Example:

import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;

CuratorFramework client = // ... initialize client

LeaderSelector leaderSelector = new LeaderSelector(client, "/app/leader", 
    new LeaderSelectorListenerAdapter() {
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            System.out.println("Taking leadership...");
            
            try {
                // This method should not return until leadership is relinquished
                while (true) {
                    // Perform leader duties
                    System.out.println("Doing leader work...");
                    Thread.sleep(5000);
                    
                    // Check if we should give up leadership
                    if (shouldGiveUpLeadership()) {
                        break;
                    }
                }
            } finally {
                System.out.println("Giving up leadership");
            }
        }
        
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            super.stateChanged(client, newState);
            if (newState == ConnectionState.LOST) {
                // Connection lost - leadership will be automatically relinquished
                handleConnectionLoss();
            }
        }
    });

leaderSelector.setId("server-1");
leaderSelector.start();
leaderSelector.autoRequeue(); // Automatically requeue for leadership

try {
    // Keep running...
    Thread.sleep(300000);
} finally {
    leaderSelector.close();
}

Participant

Represents a participant in leader election, providing information about each process competing for leadership.

/**
 * Information about a participant in leader election
 */
public class Participant {
    /**
     * Get the participant's unique ID
     * @return participant ID
     */
    public String getId();
    
    /**
     * Check if this participant is currently the leader
     * @return true if this is the current leader
     */
    public boolean isLeader();
}

LeaderLatchListener

Listener interface for receiving notifications about LeaderLatch state changes.

/**
 * Listener for LeaderLatch state change notifications
 */
public interface LeaderLatchListener {
    /**
     * Called when this instance becomes the leader
     */
    void isLeader();
    
    /**
     * Called when this instance loses leadership
     */
    void notLeader();
}

LeaderSelectorListener

Listener interface for LeaderSelector leadership management.

/**
 * Listener for LeaderSelector leadership events
 */
public interface LeaderSelectorListener extends ConnectionStateListener {
    /**
     * Called when this instance should take leadership
     * This method should not return until leadership should be relinquished
     * @param client the curator client
     * @throws Exception if an error occurs during leadership
     */
    void takeLeadership(CuratorFramework client) throws Exception;
}

LeaderSelectorListenerAdapter

Recommended base class for LeaderSelectorListener implementations that handles connection state properly.

/**
 * Recommended base class for LeaderSelectorListener that handles
 * connection state changes appropriately
 */
public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
    /**
     * Default implementation that handles standard connection state changes
     * @param client the curator client
     * @param newState the new connection state
     */
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState);
    
    /**
     * Subclasses must implement this to handle leadership
     * @param client the curator client
     */
    @Override
    public abstract void takeLeadership(CuratorFramework client) throws Exception;
}

CancelLeadershipException

Exception that can be thrown to interrupt leadership during connection changes.

/**
 * Exception that can be thrown during takeLeadership() to indicate
 * that leadership should be interrupted due to connection issues
 */
public class CancelLeadershipException extends Exception {
    /**
     * Create a new CancelLeadershipException
     */
    public CancelLeadershipException();
    
    /**
     * Create a new CancelLeadershipException with message
     * @param message exception message
     */
    public CancelLeadershipException(String message);
    
    /**
     * Create a new CancelLeadershipException with cause
     * @param cause the underlying cause
     */
    public CancelLeadershipException(Throwable cause);
    
    /**
     * Create a new CancelLeadershipException with message and cause
     * @param message exception message
     * @param cause the underlying cause
     */
    public CancelLeadershipException(String message, Throwable cause);
}

Common Patterns

Graceful Leadership Handoff

LeaderLatch latch = new LeaderLatch(client, "/app/leader", "server-1");
AtomicBoolean shouldStop = new AtomicBoolean(false);

latch.addListener(new LeaderLatchListener() {
    @Override
    public void isLeader() {
        // Start leader tasks
        startLeaderTasks();
    }
    
    @Override
    public void notLeader() {
        // Gracefully stop leader tasks
        shouldStop.set(true);
        stopLeaderTasks();
    }
});

latch.start();

// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    try {
        shouldStop.set(true);
        latch.close();
    } catch (IOException e) {
        // Log error
    }
}));

Connection State Handling

LeaderSelector selector = new LeaderSelector(client, "/app/leader",
    new LeaderSelectorListenerAdapter() {
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // Perform leader duties
                    doLeaderWork();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.SUSPENDED || 
                newState == ConnectionState.LOST) {
                // Interrupt current leadership
                Thread.currentThread().interrupt();
            }
        }
    });

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-shaded-curator

docs

caching.md

index.md

leader-election.md

locking.md

shared-values.md

tile.json