CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime

The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications

Pending
Overview
Eval results
Files

high-availability.mddocs/

High Availability and Coordination

Leader election, service discovery, and coordination services for fault-tolerant distributed operation. Flink's high availability system ensures cluster resilience and automatic recovery from failures.

Capabilities

HighAvailabilityServices

Central service providing high availability components for leader election, service discovery, and coordination.

/**
 * The HighAvailabilityServices provide access to all services needed for a highly-available
 * setup. In particular, they provide access to highly-available variants of the following services:
 * - ResourceManager leader election and leader retrieval
 * - Dispatcher leader election and leader retrieval  
 * - JobManager leader election and leader retrieval
 * - Checkpointing metadata persistence
 */
public interface HighAvailabilityServices extends AutoCloseableAsync {
    /** Get ResourceManager leader election service */
    LeaderElectionService getResourceManagerLeaderElectionService();
    
    /** Get Dispatcher leader election service */
    LeaderElectionService getDispatcherLeaderElectionService();
    
    /** Get JobManager leader election service for specific job */
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
    
    /** Get ResourceManager leader retrieval service */
    LeaderRetrievalService getResourceManagerLeaderRetriever();
    
    /** Get Dispatcher leader retrieval service */
    LeaderRetrievalService getDispatcherLeaderRetriever();
    
    /** Get JobManager leader retrieval service for specific job */
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
    
    /** Get JobManager leader retrieval service with fallback */
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
    
    /** Get web monitor leader retrieval service */
    LeaderRetrievalService getWebMonitorLeaderRetriever();
    
    /** Get cluster rest endpoint leader retrieval service */
    LeaderRetrievalService getClusterRestEndpointLeaderRetriever();
    
    /** Get checkpoint recovery factory */
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();
    
    /** Get job graph store */
    JobGraphStore getJobGraphStore();
    
    /** Get job result store */
    JobResultStore getJobResultStore();
    
    /** Get running jobs registry */
    RunningJobsRegistry getRunningJobsRegistry();
    
    /** Get blob store service */
    BlobStoreService createBlobStore();
    
    /** Close services asynchronously */
    CompletableFuture<Void> closeAsync();
    
    /** Close and cleanup all HA data */
    CompletableFuture<Void> closeAndCleanupAllData();
}

Usage Examples:

// Create HA services from configuration
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181,zk3:2181");
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "my-flink-cluster");

HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
    config,
    ioExecutor,
    AddressResolution.TRY_ADDRESS_RESOLUTION,
    rpcSystem,
    fatalErrorHandler
);

// Get ResourceManager leader election service
LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
rmLeaderElection.start(new ResourceManagerLeaderContender());

// Get JobManager leader retrieval for monitoring
LeaderRetrievalService jmLeaderRetrieval = haServices.getJobManagerLeaderRetriever(jobId);
jmLeaderRetrieval.start(new JobManagerLeaderListener());

// Get checkpoint recovery factory
CheckpointRecoveryFactory checkpointRecovery = haServices.getCheckpointRecoveryFactory();
CompletedCheckpointStore checkpointStore = checkpointRecovery.createRecoveredCompletedCheckpointStore(
    jobId,
    maxNumberOfCheckpointsToRetain,
    sharedStateRegistryFactory,
    ioExecutor
);

LeaderElectionService

Service for conducting leader elections among multiple candidates, ensuring single leadership.

/**
 * Interface for a service which allows to elect a leader among a group of contenders.
 * Prior to using this service, it has to be started by calling the start method.
 * The start method takes the contender as an argument. If there are multiple contenders,
 * then each one has to call the start method with its own contender object.
 */
public interface LeaderElectionService {
    /** Start the leader election service with a contender */
    void start(LeaderContender contender);
    
    /** Stop the leader election service */
    void stop();
    
    /** Confirm leadership by the current leader */
    void confirmLeadership(UUID leaderSessionID);
    
    /** Check if contender has leadership */
    boolean hasLeadership(UUID leaderSessionId);
}

/**
 * Interface for leader contenders which participate in leader election.
 */
public interface LeaderContender {
    /** Grant leadership to this contender */
    void grantLeadership(UUID leaderSessionID);
    
    /** Revoke leadership from this contender */
    void revokeLeadership();
    
    /** Get leader address when requested */
    String getDescription();
    
    /** Handle errors during leadership */
    void handleError(Exception exception);
}

LeaderRetrievalService

Service for retrieving information about current leaders and being notified of leadership changes.

/**
 * Service which retrieves the current leader and notifies a listener about leadership changes.
 * The leader retrieval service can only be started once.
 */
public interface LeaderRetrievalService {
    /** Start the leader retrieval service with a listener */
    void start(LeaderRetrievalListener listener);
    
    /** Stop the leader retrieval service */
    void stop();
}

/**
 * Listener interface for leader retrieval. The listener is notified
 * about new leaders and leader changes.
 */
public interface LeaderRetrievalListener {
    /** Notify about new leader */
    void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
    
    /** Handle retrieval errors */
    void handleError(Exception exception);
}

Usage Examples:

// Implement leader contender
public class ResourceManagerLeaderContender implements LeaderContender {
    private final ResourceManager resourceManager;
    private UUID currentLeaderSessionId;
    
    @Override
    public void grantLeadership(UUID leaderSessionID) {
        currentLeaderSessionId = leaderSessionID;
        resourceManager.becomeLeader(leaderSessionID);
        // Confirm leadership
        leaderElectionService.confirmLeadership(leaderSessionID);
    }
    
    @Override
    public void revokeLeadership() {
        currentLeaderSessionId = null;
        resourceManager.loseLeadership();
    }
    
    @Override
    public String getDescription() {
        return resourceManager.getAddress();
    }
    
    @Override
    public void handleError(Exception exception) {
        resourceManager.handleFatalError(exception);
    }
}

// Implement leader retrieval listener
public class JobManagerLeaderListener implements LeaderRetrievalListener {
    private final JobMasterGatewayRetriever gatewayRetriever;
    
    @Override
    public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
        if (leaderAddress != null && leaderSessionID != null) {
            JobMasterGateway gateway = gatewayRetriever.getGateway(leaderAddress);
            // Connect to new JobManager leader
            connectToJobManager(gateway, leaderSessionID);
        } else {
            // Leader lost
            disconnectFromJobManager();
        }
    }
    
    @Override
    public void handleError(Exception exception) {
        // Handle retrieval error
        logger.error("Error in leader retrieval", exception);
    }
}

ZooKeeper High Availability

ZooKeeper-based implementation providing distributed coordination and persistence.

/**
 * ZooKeeper based implementation of HighAvailabilityServices.
 */
public class ZooKeeperHaServices implements HighAvailabilityServices {
    /** Create ZooKeeper HA services */
    public static ZooKeeperHaServices create(
        CuratorFramework client,
        Configuration configuration,
        Executor executor
    );
    
    /** Get ZooKeeper client */
    public CuratorFramework getClient();
    
    /** Get cluster configuration store path */
    public String getClusterConfigurationStorePath();
    
    /** Get leader path for component */
    public String getLeaderPath(String componentName);
    
    /** Create ZooKeeper leader election service */  
    protected LeaderElectionService createLeaderElectionService(String leaderPath);
    
    /** Create ZooKeeper leader retrieval service */
    protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath);
}

/**
 * ZooKeeper based leader election service implementation.
 */
public class ZooKeeperLeaderElectionService implements LeaderElectionService {
    /** Create ZooKeeper leader election service */
    public ZooKeeperLeaderElectionService(
        CuratorFramework client,
        String latchPath,
        String leaderPath
    );
    
    /** Start leader election */
    public void start(LeaderContender contender);
    
    /** Stop leader election */
    public void stop();
    
    /** Confirm leadership */
    public void confirmLeadership(UUID leaderSessionID);
    
    /** Check leadership */
    public boolean hasLeadership(UUID leaderSessionId);
    
    /** Get leader latch */
    protected LeaderLatch getLeaderLatch();
    
    /** Write leader information to ZooKeeper */
    protected void writeLeaderInformation(UUID leaderSessionID);
}

/**
 * ZooKeeper based leader retrieval service implementation.
 */
public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService {
    /** Create ZooKeeper leader retrieval service */
    public ZooKeeperLeaderRetrievalService(
        CuratorFramework client,
        String retrievalPath
    );
    
    /** Start leader retrieval */
    public void start(LeaderRetrievalListener listener);
    
    /** Stop leader retrieval */
    public void stop();
    
    /** Handle ZooKeeper connection state changes */
    protected void handleConnectionStateChanged(ConnectionState newState);
    
    /** Handle leader node changes */
    protected void handleLeaderChange();
}

Embedded High Availability

Simple embedded HA implementation for testing and single-node deployments.

/**
 * An implementation of the HighAvailabilityServices for the non-high-availability case.
 * This implementation can be used for testing or for cluster setups that do not
 * require high availability.
 */
public class EmbeddedHaServices implements HighAvailabilityServices {
    /** Create embedded HA services */
    public EmbeddedHaServices(Executor executor);
    
    /** Get ResourceManager leader election service */
    public LeaderElectionService getResourceManagerLeaderElectionService();
    
    /** Get Dispatcher leader election service */
    public LeaderElectionService getDispatcherLeaderElectionService();
    
    /** Get JobManager leader election service */
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
    
    /** Get checkpoint recovery factory */
    public CheckpointRecoveryFactory getCheckpointRecoveryFactory();
    
    /** Get job graph store */
    public JobGraphStore getJobGraphStore();
    
    /** Get job result store */
    public JobResultStore getJobResultStore();
    
    /** Get running jobs registry */
    public RunningJobsRegistry getRunningJobsRegistry();
    
    /** Create blob store service */
    public BlobStoreService createBlobStore();
    
    /** Close services */
    public CompletableFuture<Void> closeAsync();
}

/**
 * Embedded leader election service that immediately grants leadership.
 */
public class EmbeddedLeaderElectionService implements LeaderElectionService {
    /** Start with automatic leadership grant */
    public void start(LeaderContender contender);
    
    /** Stop the service */
    public void stop();
    
    /** Confirm leadership (always true for embedded) */
    public void confirmLeadership(UUID leaderSessionID);
    
    /** Check leadership (always true for embedded) */
    public boolean hasLeadership(UUID leaderSessionId);
}

Job Graph Store

Persistent storage for job graphs enabling recovery after failures.

/**
 * JobGraphStore interface for persisting and retrieving job graphs in a highly available manner.
 */
public interface JobGraphStore {
    /** Start the job graph store */
    void start(JobGraphListener jobGraphListener);
    
    /** Stop the job graph store */
    void stop();
    
    /** Put job graph into store */
    void putJobGraph(JobGraph jobGraph);
    
    /** Remove job graph from store */
    void removeJobGraph(JobID jobId);
    
    /** Release locks for job graph */
    void releaseJobGraph(JobID jobId);
    
    /** Get all job IDs */
    Collection<JobID> getJobIds();
    
    /** Get stored job graphs */
    Collection<JobGraph> recoverJobGraphs();
}

/**
 * Listener for job graph store events.
 */
public interface JobGraphListener {
    /** Called when job graph is added */
    void onAddedJobGraph(JobID jobId);
    
    /** Called when job graph is removed */
    void onRemovedJobGraph(JobID jobId);
}

Usage Examples:

// Configure high availability mode
Configuration config = new Configuration();

// ZooKeeper HA setup
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181");
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "production-cluster");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "hdfs://cluster/flink-ha");

// Or embedded HA for testing
// config.setString(HighAvailabilityOptions.HA_MODE, "NONE");

// Create HA services
HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
    config,
    ioExecutor,
    AddressResolution.TRY_ADDRESS_RESOLUTION,
    rpcSystem,
    fatalErrorHandler
);

// Use job graph store for persistence
JobGraphStore jobGraphStore = haServices.getJobGraphStore();
jobGraphStore.start(new JobGraphListener() {
    @Override
    public void onAddedJobGraph(JobID jobId) {
        System.out.println("Job graph added: " + jobId);
    }
    
    @Override
    public void onRemovedJobGraph(JobID jobId) {
        System.out.println("Job graph removed: " + jobId);
    }
});

// Store job graph
jobGraphStore.putJobGraph(jobGraph);

// Recover job graphs after restart
Collection<JobGraph> recoveredJobs = jobGraphStore.recoverJobGraphs();
for (JobGraph job : recoveredJobs) {
    System.out.println("Recovered job: " + job.getJobID());
}

Types

// High availability modes
public enum HighAvailabilityMode {
    NONE("NONE"),
    ZOOKEEPER("zookeeper"),
    KUBERNETES("kubernetes");
    
    private final String value;
    
    public String getValue();
    public static HighAvailabilityMode fromConfig(Configuration config);
}

// Leadership session identifiers
public class UUID implements Serializable, Comparable<UUID> {
    public static UUID randomUUID();
    public static UUID fromString(String name);
    
    public long getMostSignificantBits();
    public long getLeastSignificantBits();
    public String toString();
}

// Running jobs registry
public interface RunningJobsRegistry {
    /** Set job running */
    void setJobRunning(JobID jobID);
    
    /** Set job finished */
    void setJobFinished(JobID jobID);
    
    /** Get job scheduling status */
    JobSchedulingStatus getJobSchedulingStatus(JobID jobID);
    
    /** Get running job IDs */
    Set<JobID> getRunningJobIds();
    
    /** Clear job from registry */
    void clearJob(JobID jobID);
}

public enum JobSchedulingStatus {
    PENDING,
    RUNNING,
    DONE
}

// Checkpoint recovery components
public interface CheckpointRecoveryFactory {
    /** Create completed checkpoint store */
    CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
        JobID jobId,
        int maxNumberOfCheckpointsToRetain,
        SharedStateRegistryFactory sharedStateRegistryFactory,
        Executor ioExecutor
    );
    
    /** Create checkpoint ID counter */
    CheckpointIDCounter createCheckpointIDCounter(JobID jobId);
}

// Configuration options
public class HighAvailabilityOptions {
    public static final ConfigOption<String> HA_MODE;
    public static final ConfigOption<String> HA_CLUSTER_ID; 
    public static final ConfigOption<String> HA_STORAGE_PATH;
    public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM;
    public static final ConfigOption<String> HA_ZOOKEEPER_ROOT;
    public static final ConfigOption<Integer> HA_ZOOKEEPER_SESSION_TIMEOUT;
    public static final ConfigOption<Integer> HA_ZOOKEEPER_CONNECTION_TIMEOUT;
    public static final ConfigOption<Integer> HA_ZOOKEEPER_RETRY_WAIT;
    public static final ConfigOption<Integer> HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS;
    public static final ConfigOption<String> HA_ZOOKEEPER_NAMESPACE;
}

// Blob store service for distributed file storage
public interface BlobStoreService extends Closeable {
    /** Put blob in store */
    boolean put(File localFile, BlobKey blobKey);
    
    /** Get blob from store */
    boolean get(BlobKey blobKey, File localFile);
    
    /** Delete blob from store */
    boolean delete(BlobKey blobKey);
    
    /** Delete all blobs for job */
    boolean deleteAll(JobID jobId);
    
    /** Close the blob store */
    void close();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime

docs

execution-scheduling.md

high-availability.md

index.md

job-graph.md

resource-management.md

state-checkpointing.md

task-execution.md

tile.json