Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The High Availability Services provide cluster coordination and fault tolerance infrastructure for Flink clusters. These services enable leader election, distributed storage coordination, and recovery mechanisms that ensure cluster resilience and continuous operation in the face of node failures.
The primary interface that provides access to all high availability services required by a Flink cluster.
public interface HighAvailabilityServices extends AutoCloseable {
LeaderRetrievalService getResourceManagerLeaderRetriever();
LeaderRetrievalService getDispatcherLeaderRetriever();
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
LeaderRetrievalService getWebMonitorLeaderRetriever();
LeaderElectionService getResourceManagerLeaderElectionService();
LeaderElectionService getDispatcherLeaderElectionService();
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
LeaderElectionService getWebMonitorLeaderElectionService();
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
JobGraphStore getJobGraphStore();
JobResultStore getJobResultStore();
RunningJobsRegistry getRunningJobsRegistry();
BlobStore createBlobStore() throws IOException;
@Override
void close() throws Exception;
void closeAndCleanupAllData() throws Exception;
}Service for participating in leader election processes within the cluster.
public interface LeaderElectionService {
void start(LeaderContender contender) throws Exception;
void stop() throws Exception;
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
boolean hasLeadership(UUID leaderSessionId);
}Interface implemented by components that want to participate in leader election.
public interface LeaderContender {
void grantLeadership(UUID leaderSessionID);
void revokeLeadership();
String getAddress();
void handleError(Exception exception);
}Service for retrieving current leader information and receiving leadership change notifications.
public interface LeaderRetrievalService {
void start(LeaderRetrievalListener listener) throws Exception;
void stop() throws Exception;
}Listener interface for receiving leader change notifications.
public interface LeaderRetrievalListener {
void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
void handleError(Exception exception);
}Factory for creating checkpoint recovery services that handle checkpoint metadata persistence.
public interface CheckpointRecoveryFactory {
CompletedCheckpointStore createCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
ClassLoader userClassLoader
) throws Exception;
CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
}Persistent storage for job graphs to enable job recovery after failures.
public interface JobGraphStore {
void putJobGraph(StoredJobGraph jobGraph) throws Exception;
StoredJobGraph recoverJobGraph(JobID jobId) throws Exception;
void removeJobGraph(JobID jobId) throws Exception;
Collection<JobID> getJobIds() throws Exception;
void start(JobGraphListener jobGraphListener) throws Exception;
void stop() throws Exception;
}Storage for persisting job execution results and status information.
public interface JobResultStore {
void createDirtyResult(JobResultEntry jobResultEntry) throws IOException;
void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException;
boolean hasJobResultEntry(JobID jobId) throws IOException;
boolean hasDirtyJobResultEntry(JobID jobId) throws IOException;
boolean hasCleanJobResultEntry(JobID jobId) throws IOException;
Set<JobResult> getDirtyResults() throws IOException;
Set<JobResult> getCleanResults() throws IOException;
}Registry for tracking which jobs are currently running in the cluster.
public interface RunningJobsRegistry {
void setJobRunning(JobID jobID) throws IOException;
void setJobFinished(JobID jobID) throws IOException;
JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;
enum JobSchedulingStatus {
PENDING,
RUNNING,
DONE
}
}Distributed storage service for binary large objects (BLOBs) like JAR files and large state.
public interface BlobStore extends Closeable {
boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
boolean delete(JobID jobId, BlobKey blobKey);
boolean deleteAll(JobID jobId);
void closeAndCleanupAllData() throws IOException;
}Utility class providing factory methods and helper functions for HA services.
public class HighAvailabilityServicesUtils {
public static HighAvailabilityServices createAvailableOrEmbeddedServices(
Configuration config,
Executor executor
) throws Exception;
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution
) throws Exception;
public static String getJobManagerAddress(Configuration config) throws Exception;
public static LeaderRetrievalService createLeaderRetrievalService(
Configuration config,
String serviceName
) throws Exception;
public static LeaderElectionService createLeaderElectionService(
Configuration config,
String serviceName
) throws Exception;
public static void setJobManagerAddress(Configuration config, String address, int port);
}Configuration options for setting up high availability services.
public class HighAvailabilityOptions {
public static final ConfigOption<String> HA_MODE =
key("high-availability").defaultValue("NONE");
public static final ConfigOption<String> HA_CLUSTER_ID =
key("high-availability.cluster-id").defaultValue("default");
public static final ConfigOption<String> HA_STORAGE_PATH =
key("high-availability.storageDir").noDefaultValue();
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
key("high-availability.zookeeper.quorum").noDefaultValue();
public static final ConfigOption<Integer> HA_ZOOKEEPER_SESSION_TIMEOUT =
key("high-availability.zookeeper.client.session-timeout").defaultValue(60000);
public static final ConfigOption<Integer> HA_ZOOKEEPER_CONNECTION_TIMEOUT =
key("high-availability.zookeeper.client.connection-timeout").defaultValue(15000);
public static final ConfigOption<Integer> HA_ZOOKEEPER_RETRY_WAIT =
key("high-availability.zookeeper.client.retry-wait").defaultValue(5000);
public static final ConfigOption<Integer> HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS =
key("high-availability.zookeeper.client.max-retry-attempts").defaultValue(3);
public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
key("high-availability.zookeeper.path.root").defaultValue("/flink");
}Base exception for high availability service failures.
public class HighAvailabilityServicesException extends FlinkException {
public HighAvailabilityServicesException(String message);
public HighAvailabilityServicesException(String message, Throwable cause);
}Exception thrown during leader election process failures.
public class LeaderElectionException extends FlinkException {
public LeaderElectionException(String message);
public LeaderElectionException(String message, Throwable cause);
}import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
// Configure ZooKeeper-based high availability
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "localhost:2181");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///ha-storage");
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "my-flink-cluster");
// Create HA services
HighAvailabilityServices haServices = HighAvailabilityServicesUtils
.createHighAvailabilityServices(config, executor, AddressResolution.TRY_ADDRESS_RESOLUTION);
try {
// Use HA services for cluster coordination
LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
LeaderRetrievalService rmLeaderRetrieval = haServices.getResourceManagerLeaderRetriever();
// Set up leader election for Resource Manager
ResourceManagerLeaderContender rmContender = new ResourceManagerLeaderContender();
rmLeaderElection.start(rmContender);
// Set up leader retrieval for clients
ResourceManagerLeaderListener rmListener = new ResourceManagerLeaderListener();
rmLeaderRetrieval.start(rmListener);
} finally {
haServices.close();
}import org.apache.flink.runtime.highavailability.LeaderContender;
import java.util.UUID;
public class ResourceManagerLeaderContender implements LeaderContender {
private volatile boolean isLeader = false;
private volatile UUID currentLeaderSessionId;
private final String address;
public ResourceManagerLeaderContender(String address) {
this.address = address;
}
@Override
public void grantLeadership(UUID leaderSessionID) {
synchronized (this) {
if (!isLeader) {
System.out.println("Granted leadership with session ID: " + leaderSessionID);
this.currentLeaderSessionId = leaderSessionID;
this.isLeader = true;
// Confirm leadership and start serving as leader
confirmLeadership(leaderSessionID);
startLeaderServices();
}
}
}
@Override
public void revokeLeadership() {
synchronized (this) {
if (isLeader) {
System.out.println("Leadership revoked");
this.isLeader = false;
this.currentLeaderSessionId = null;
// Stop leader services
stopLeaderServices();
}
}
}
@Override
public String getAddress() {
return address;
}
@Override
public void handleError(Exception exception) {
System.err.println("Leader election error: " + exception.getMessage());
// Handle leadership errors - may need to restart election
revokeLeadership();
}
private void confirmLeadership(UUID leaderSessionID) {
// Confirm leadership with the election service
leaderElectionService.confirmLeadership(leaderSessionID, address);
}
private void startLeaderServices() {
// Initialize services that only the leader should run
System.out.println("Starting Resource Manager leader services");
}
private void stopLeaderServices() {
// Clean up leader-only services
System.out.println("Stopping Resource Manager leader services");
}
public boolean hasLeadership() {
return isLeader;
}
public UUID getCurrentLeaderSessionId() {
return currentLeaderSessionId;
}
}import org.apache.flink.runtime.highavailability.LeaderRetrievalListener;
import java.util.UUID;
public class ResourceManagerLeaderListener implements LeaderRetrievalListener {
private volatile String currentLeaderAddress;
private volatile UUID currentLeaderSessionId;
@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
synchronized (this) {
if (!Objects.equals(currentLeaderAddress, leaderAddress) ||
!Objects.equals(currentLeaderSessionId, leaderSessionID)) {
System.out.println("New Resource Manager leader: " + leaderAddress +
" (session: " + leaderSessionID + ")");
// Update connection to new leader
updateLeaderConnection(leaderAddress, leaderSessionID);
this.currentLeaderAddress = leaderAddress;
this.currentLeaderSessionId = leaderSessionID;
}
}
}
@Override
public void handleError(Exception exception) {
System.err.println("Leader retrieval error: " + exception.getMessage());
// Clear current leader information
synchronized (this) {
this.currentLeaderAddress = null;
this.currentLeaderSessionId = null;
disconnectFromLeader();
}
}
private void updateLeaderConnection(String leaderAddress, UUID leaderSessionId) {
// Establish connection to the new leader
if (leaderAddress != null) {
System.out.println("Connecting to Resource Manager at: " + leaderAddress);
// ... connect to leader
} else {
System.out.println("No Resource Manager leader available");
disconnectFromLeader();
}
}
private void disconnectFromLeader() {
// Clean up connections to previous leader
System.out.println("Disconnecting from Resource Manager leader");
}
public String getCurrentLeaderAddress() {
return currentLeaderAddress;
}
public UUID getCurrentLeaderSessionId() {
return currentLeaderSessionId;
}
}import org.apache.flink.runtime.highavailability.JobGraphStore;
import org.apache.flink.api.common.JobID;
public class FileSystemJobGraphStore implements JobGraphStore {
private final Path storageDirectory;
private volatile JobGraphListener listener;
public FileSystemJobGraphStore(Path storageDirectory) {
this.storageDirectory = storageDirectory;
}
@Override
public void putJobGraph(StoredJobGraph jobGraph) throws Exception {
JobID jobId = jobGraph.getJobId();
Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
// Serialize and store job graph
try (ObjectOutputStream oos = new ObjectOutputStream(
Files.newOutputStream(jobFile))) {
oos.writeObject(jobGraph);
}
System.out.println("Stored job graph for: " + jobId);
// Notify listener
if (listener != null) {
listener.onAddedJobGraph(jobId);
}
}
@Override
public StoredJobGraph recoverJobGraph(JobID jobId) throws Exception {
Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
if (!Files.exists(jobFile)) {
throw new Exception("Job graph not found: " + jobId);
}
// Deserialize job graph
try (ObjectInputStream ois = new ObjectInputStream(
Files.newInputStream(jobFile))) {
return (StoredJobGraph) ois.readObject();
}
}
@Override
public void removeJobGraph(JobID jobId) throws Exception {
Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
Files.deleteIfExists(jobFile);
System.out.println("Removed job graph for: " + jobId);
// Notify listener
if (listener != null) {
listener.onRemovedJobGraph(jobId);
}
}
@Override
public Collection<JobID> getJobIds() throws Exception {
if (!Files.exists(storageDirectory)) {
return Collections.emptyList();
}
List<JobID> jobIds = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(
storageDirectory, "*.job")) {
for (Path path : stream) {
String filename = path.getFileName().toString();
String jobIdStr = filename.substring(0, filename.lastIndexOf(".job"));
jobIds.add(JobID.fromHexString(jobIdStr));
}
}
return jobIds;
}
@Override
public void start(JobGraphListener jobGraphListener) throws Exception {
this.listener = jobGraphListener;
// Ensure storage directory exists
Files.createDirectories(storageDirectory);
System.out.println("Started FileSystem job graph store at: " + storageDirectory);
}
@Override
public void stop() throws Exception {
this.listener = null;
System.out.println("Stopped FileSystem job graph store");
}
}import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
public class FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
private final Path checkpointDirectory;
private final Configuration configuration;
public FileSystemCheckpointRecoveryFactory(Path checkpointDirectory, Configuration configuration) {
this.checkpointDirectory = checkpointDirectory;
this.configuration = configuration;
}
@Override
public CompletedCheckpointStore createCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
ClassLoader userClassLoader) throws Exception {
Path jobCheckpointDir = checkpointDirectory.resolve(jobId.toString());
return new FileSystemCompletedCheckpointStore(
jobCheckpointDir,
maxNumberOfCheckpointsToRetain,
userClassLoader
);
}
@Override
public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {
Path counterFile = checkpointDirectory.resolve(jobId.toString()).resolve("counter");
return new FileSystemCheckpointIDCounter(counterFile);
}
}public class ClusterManager {
private HighAvailabilityServices haServices;
private final Configuration configuration;
public ClusterManager(Configuration configuration) {
this.configuration = configuration;
}
public void start() throws Exception {
// Initialize HA services
haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
ForkJoinPool.commonPool(),
AddressResolution.TRY_ADDRESS_RESOLUTION
);
// Start cluster components with HA support
startResourceManager();
startDispatcher();
startWebMonitor();
}
public void stop() throws Exception {
try {
// Stop cluster components first
stopWebMonitor();
stopDispatcher();
stopResourceManager();
} finally {
// Always cleanup HA services
if (haServices != null) {
haServices.closeAndCleanupAllData();
}
}
}
private void startResourceManager() throws Exception {
LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
ResourceManagerLeaderContender contender = new ResourceManagerLeaderContender();
rmLeaderElection.start(contender);
}
// ... other component management methods
}public class HARobustComponent implements LeaderContender, LeaderRetrievalListener {
private final ScheduledExecutorService scheduler;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
@Override
public void handleError(Exception exception) {
System.err.println("HA error occurred: " + exception.getMessage());
// Implement exponential backoff retry
scheduler.schedule(() -> {
if (isRunning.get()) {
try {
restartHAServices();
} catch (Exception e) {
System.err.println("Failed to restart HA services: " + e.getMessage());
handleError(e); // Recursive retry with backoff
}
}
}, calculateBackoffDelay(), TimeUnit.MILLISECONDS);
}
private void restartHAServices() throws Exception {
// Restart HA service connections
System.out.println("Restarting HA services");
// ... restart logic
}
private long calculateBackoffDelay() {
// Implement exponential backoff
return Math.min(1000 * (long) Math.pow(2, retryCount), 30000);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10