The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications
—
Leader election, service discovery, and coordination services for fault-tolerant distributed operation. Flink's high availability system ensures cluster resilience and automatic recovery from failures.
Central service providing high availability components for leader election, service discovery, and coordination.
/**
* The HighAvailabilityServices provide access to all services needed for a highly-available
* setup. In particular, they provide access to highly-available variants of the following services:
* - ResourceManager leader election and leader retrieval
* - Dispatcher leader election and leader retrieval
* - JobManager leader election and leader retrieval
* - Checkpointing metadata persistence
*/
public interface HighAvailabilityServices extends AutoCloseableAsync {
/** Get ResourceManager leader election service */
LeaderElectionService getResourceManagerLeaderElectionService();
/** Get Dispatcher leader election service */
LeaderElectionService getDispatcherLeaderElectionService();
/** Get JobManager leader election service for specific job */
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
/** Get ResourceManager leader retrieval service */
LeaderRetrievalService getResourceManagerLeaderRetriever();
/** Get Dispatcher leader retrieval service */
LeaderRetrievalService getDispatcherLeaderRetriever();
/** Get JobManager leader retrieval service for specific job */
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
/** Get JobManager leader retrieval service with fallback */
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
/** Get web monitor leader retrieval service */
LeaderRetrievalService getWebMonitorLeaderRetriever();
/** Get cluster rest endpoint leader retrieval service */
LeaderRetrievalService getClusterRestEndpointLeaderRetriever();
/** Get checkpoint recovery factory */
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
/** Get job graph store */
JobGraphStore getJobGraphStore();
/** Get job result store */
JobResultStore getJobResultStore();
/** Get running jobs registry */
RunningJobsRegistry getRunningJobsRegistry();
/** Get blob store service */
BlobStoreService createBlobStore();
/** Close services asynchronously */
CompletableFuture<Void> closeAsync();
/** Close and cleanup all HA data */
CompletableFuture<Void> closeAndCleanupAllData();
}Usage Examples:
// Create HA services from configuration
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181,zk3:2181");
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "my-flink-cluster");
HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
ioExecutor,
AddressResolution.TRY_ADDRESS_RESOLUTION,
rpcSystem,
fatalErrorHandler
);
// Get ResourceManager leader election service
LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
rmLeaderElection.start(new ResourceManagerLeaderContender());
// Get JobManager leader retrieval for monitoring
LeaderRetrievalService jmLeaderRetrieval = haServices.getJobManagerLeaderRetriever(jobId);
jmLeaderRetrieval.start(new JobManagerLeaderListener());
// Get checkpoint recovery factory
CheckpointRecoveryFactory checkpointRecovery = haServices.getCheckpointRecoveryFactory();
CompletedCheckpointStore checkpointStore = checkpointRecovery.createRecoveredCompletedCheckpointStore(
jobId,
maxNumberOfCheckpointsToRetain,
sharedStateRegistryFactory,
ioExecutor
);Service for conducting leader elections among multiple candidates, ensuring single leadership.
/**
* Interface for a service which allows to elect a leader among a group of contenders.
* Prior to using this service, it has to be started by calling the start method.
* The start method takes the contender as an argument. If there are multiple contenders,
* then each one has to call the start method with its own contender object.
*/
public interface LeaderElectionService {
/** Start the leader election service with a contender */
void start(LeaderContender contender);
/** Stop the leader election service */
void stop();
/** Confirm leadership by the current leader */
void confirmLeadership(UUID leaderSessionID);
/** Check if contender has leadership */
boolean hasLeadership(UUID leaderSessionId);
}
/**
* Interface for leader contenders which participate in leader election.
*/
public interface LeaderContender {
/** Grant leadership to this contender */
void grantLeadership(UUID leaderSessionID);
/** Revoke leadership from this contender */
void revokeLeadership();
/** Get leader address when requested */
String getDescription();
/** Handle errors during leadership */
void handleError(Exception exception);
}Service for retrieving information about current leaders and being notified of leadership changes.
/**
* Service which retrieves the current leader and notifies a listener about leadership changes.
* The leader retrieval service can only be started once.
*/
public interface LeaderRetrievalService {
/** Start the leader retrieval service with a listener */
void start(LeaderRetrievalListener listener);
/** Stop the leader retrieval service */
void stop();
}
/**
* Listener interface for leader retrieval. The listener is notified
* about new leaders and leader changes.
*/
public interface LeaderRetrievalListener {
/** Notify about new leader */
void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
/** Handle retrieval errors */
void handleError(Exception exception);
}Usage Examples:
// Implement leader contender
public class ResourceManagerLeaderContender implements LeaderContender {
private final ResourceManager resourceManager;
private UUID currentLeaderSessionId;
@Override
public void grantLeadership(UUID leaderSessionID) {
currentLeaderSessionId = leaderSessionID;
resourceManager.becomeLeader(leaderSessionID);
// Confirm leadership
leaderElectionService.confirmLeadership(leaderSessionID);
}
@Override
public void revokeLeadership() {
currentLeaderSessionId = null;
resourceManager.loseLeadership();
}
@Override
public String getDescription() {
return resourceManager.getAddress();
}
@Override
public void handleError(Exception exception) {
resourceManager.handleFatalError(exception);
}
}
// Implement leader retrieval listener
public class JobManagerLeaderListener implements LeaderRetrievalListener {
private final JobMasterGatewayRetriever gatewayRetriever;
@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
if (leaderAddress != null && leaderSessionID != null) {
JobMasterGateway gateway = gatewayRetriever.getGateway(leaderAddress);
// Connect to new JobManager leader
connectToJobManager(gateway, leaderSessionID);
} else {
// Leader lost
disconnectFromJobManager();
}
}
@Override
public void handleError(Exception exception) {
// Handle retrieval error
logger.error("Error in leader retrieval", exception);
}
}ZooKeeper-based implementation providing distributed coordination and persistence.
/**
* ZooKeeper based implementation of HighAvailabilityServices.
*/
public class ZooKeeperHaServices implements HighAvailabilityServices {
/** Create ZooKeeper HA services */
public static ZooKeeperHaServices create(
CuratorFramework client,
Configuration configuration,
Executor executor
);
/** Get ZooKeeper client */
public CuratorFramework getClient();
/** Get cluster configuration store path */
public String getClusterConfigurationStorePath();
/** Get leader path for component */
public String getLeaderPath(String componentName);
/** Create ZooKeeper leader election service */
protected LeaderElectionService createLeaderElectionService(String leaderPath);
/** Create ZooKeeper leader retrieval service */
protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath);
}
/**
* ZooKeeper based leader election service implementation.
*/
public class ZooKeeperLeaderElectionService implements LeaderElectionService {
/** Create ZooKeeper leader election service */
public ZooKeeperLeaderElectionService(
CuratorFramework client,
String latchPath,
String leaderPath
);
/** Start leader election */
public void start(LeaderContender contender);
/** Stop leader election */
public void stop();
/** Confirm leadership */
public void confirmLeadership(UUID leaderSessionID);
/** Check leadership */
public boolean hasLeadership(UUID leaderSessionId);
/** Get leader latch */
protected LeaderLatch getLeaderLatch();
/** Write leader information to ZooKeeper */
protected void writeLeaderInformation(UUID leaderSessionID);
}
/**
* ZooKeeper based leader retrieval service implementation.
*/
public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService {
/** Create ZooKeeper leader retrieval service */
public ZooKeeperLeaderRetrievalService(
CuratorFramework client,
String retrievalPath
);
/** Start leader retrieval */
public void start(LeaderRetrievalListener listener);
/** Stop leader retrieval */
public void stop();
/** Handle ZooKeeper connection state changes */
protected void handleConnectionStateChanged(ConnectionState newState);
/** Handle leader node changes */
protected void handleLeaderChange();
}Simple embedded HA implementation for testing and single-node deployments.
/**
* An implementation of the HighAvailabilityServices for the non-high-availability case.
* This implementation can be used for testing or for cluster setups that do not
* require high availability.
*/
public class EmbeddedHaServices implements HighAvailabilityServices {
/** Create embedded HA services */
public EmbeddedHaServices(Executor executor);
/** Get ResourceManager leader election service */
public LeaderElectionService getResourceManagerLeaderElectionService();
/** Get Dispatcher leader election service */
public LeaderElectionService getDispatcherLeaderElectionService();
/** Get JobManager leader election service */
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
/** Get checkpoint recovery factory */
public CheckpointRecoveryFactory getCheckpointRecoveryFactory();
/** Get job graph store */
public JobGraphStore getJobGraphStore();
/** Get job result store */
public JobResultStore getJobResultStore();
/** Get running jobs registry */
public RunningJobsRegistry getRunningJobsRegistry();
/** Create blob store service */
public BlobStoreService createBlobStore();
/** Close services */
public CompletableFuture<Void> closeAsync();
}
/**
* Embedded leader election service that immediately grants leadership.
*/
public class EmbeddedLeaderElectionService implements LeaderElectionService {
/** Start with automatic leadership grant */
public void start(LeaderContender contender);
/** Stop the service */
public void stop();
/** Confirm leadership (always true for embedded) */
public void confirmLeadership(UUID leaderSessionID);
/** Check leadership (always true for embedded) */
public boolean hasLeadership(UUID leaderSessionId);
}Persistent storage for job graphs enabling recovery after failures.
/**
* JobGraphStore interface for persisting and retrieving job graphs in a highly available manner.
*/
public interface JobGraphStore {
/** Start the job graph store */
void start(JobGraphListener jobGraphListener);
/** Stop the job graph store */
void stop();
/** Put job graph into store */
void putJobGraph(JobGraph jobGraph);
/** Remove job graph from store */
void removeJobGraph(JobID jobId);
/** Release locks for job graph */
void releaseJobGraph(JobID jobId);
/** Get all job IDs */
Collection<JobID> getJobIds();
/** Get stored job graphs */
Collection<JobGraph> recoverJobGraphs();
}
/**
* Listener for job graph store events.
*/
public interface JobGraphListener {
/** Called when job graph is added */
void onAddedJobGraph(JobID jobId);
/** Called when job graph is removed */
void onRemovedJobGraph(JobID jobId);
}Usage Examples:
// Configure high availability mode
Configuration config = new Configuration();
// ZooKeeper HA setup
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181");
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "production-cluster");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "hdfs://cluster/flink-ha");
// Or embedded HA for testing
// config.setString(HighAvailabilityOptions.HA_MODE, "NONE");
// Create HA services
HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
ioExecutor,
AddressResolution.TRY_ADDRESS_RESOLUTION,
rpcSystem,
fatalErrorHandler
);
// Use job graph store for persistence
JobGraphStore jobGraphStore = haServices.getJobGraphStore();
jobGraphStore.start(new JobGraphListener() {
@Override
public void onAddedJobGraph(JobID jobId) {
System.out.println("Job graph added: " + jobId);
}
@Override
public void onRemovedJobGraph(JobID jobId) {
System.out.println("Job graph removed: " + jobId);
}
});
// Store job graph
jobGraphStore.putJobGraph(jobGraph);
// Recover job graphs after restart
Collection<JobGraph> recoveredJobs = jobGraphStore.recoverJobGraphs();
for (JobGraph job : recoveredJobs) {
System.out.println("Recovered job: " + job.getJobID());
}// High availability modes
public enum HighAvailabilityMode {
NONE("NONE"),
ZOOKEEPER("zookeeper"),
KUBERNETES("kubernetes");
private final String value;
public String getValue();
public static HighAvailabilityMode fromConfig(Configuration config);
}
// Leadership session identifiers
public class UUID implements Serializable, Comparable<UUID> {
public static UUID randomUUID();
public static UUID fromString(String name);
public long getMostSignificantBits();
public long getLeastSignificantBits();
public String toString();
}
// Running jobs registry
public interface RunningJobsRegistry {
/** Set job running */
void setJobRunning(JobID jobID);
/** Set job finished */
void setJobFinished(JobID jobID);
/** Get job scheduling status */
JobSchedulingStatus getJobSchedulingStatus(JobID jobID);
/** Get running job IDs */
Set<JobID> getRunningJobIds();
/** Clear job from registry */
void clearJob(JobID jobID);
}
public enum JobSchedulingStatus {
PENDING,
RUNNING,
DONE
}
// Checkpoint recovery components
public interface CheckpointRecoveryFactory {
/** Create completed checkpoint store */
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor
);
/** Create checkpoint ID counter */
CheckpointIDCounter createCheckpointIDCounter(JobID jobId);
}
// Configuration options
public class HighAvailabilityOptions {
public static final ConfigOption<String> HA_MODE;
public static final ConfigOption<String> HA_CLUSTER_ID;
public static final ConfigOption<String> HA_STORAGE_PATH;
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM;
public static final ConfigOption<String> HA_ZOOKEEPER_ROOT;
public static final ConfigOption<Integer> HA_ZOOKEEPER_SESSION_TIMEOUT;
public static final ConfigOption<Integer> HA_ZOOKEEPER_CONNECTION_TIMEOUT;
public static final ConfigOption<Integer> HA_ZOOKEEPER_RETRY_WAIT;
public static final ConfigOption<Integer> HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS;
public static final ConfigOption<String> HA_ZOOKEEPER_NAMESPACE;
}
// Blob store service for distributed file storage
public interface BlobStoreService extends Closeable {
/** Put blob in store */
boolean put(File localFile, BlobKey blobKey);
/** Get blob from store */
boolean get(BlobKey blobKey, File localFile);
/** Delete blob from store */
boolean delete(BlobKey blobKey);
/** Delete all blobs for job */
boolean deleteAll(JobID jobId);
/** Close the blob store */
void close();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime