CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-2-10

Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.

Pending
Overview
Eval results
Files

high-availability.mddocs/

High Availability Services

The High Availability Services provide cluster coordination and fault tolerance infrastructure for Flink clusters. These services enable leader election, distributed storage coordination, and recovery mechanisms that ensure cluster resilience and continuous operation in the face of node failures.

Core Services Interface

HighAvailabilityServices

The primary interface that provides access to all high availability services required by a Flink cluster.

public interface HighAvailabilityServices extends AutoCloseable {
    LeaderRetrievalService getResourceManagerLeaderRetriever();
    LeaderRetrievalService getDispatcherLeaderRetriever(); 
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
    LeaderRetrievalService getWebMonitorLeaderRetriever();
    
    LeaderElectionService getResourceManagerLeaderElectionService();
    LeaderElectionService getDispatcherLeaderElectionService();
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
    LeaderElectionService getWebMonitorLeaderElectionService();
    
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();
    JobGraphStore getJobGraphStore();
    JobResultStore getJobResultStore();
    
    RunningJobsRegistry getRunningJobsRegistry();
    BlobStore createBlobStore() throws IOException;
    
    @Override
    void close() throws Exception;
    
    void closeAndCleanupAllData() throws Exception;
}

Leader Election Services

LeaderElectionService

Service for participating in leader election processes within the cluster.

public interface LeaderElectionService {
    void start(LeaderContender contender) throws Exception;
    void stop() throws Exception;
    
    void confirmLeadership(UUID leaderSessionID, String leaderAddress);
    boolean hasLeadership(UUID leaderSessionId);
}

LeaderContender

Interface implemented by components that want to participate in leader election.

public interface LeaderContender {
    void grantLeadership(UUID leaderSessionID);
    void revokeLeadership();
    String getAddress();
    void handleError(Exception exception);
}

LeaderRetrievalService

Service for retrieving current leader information and receiving leadership change notifications.

public interface LeaderRetrievalService {
    void start(LeaderRetrievalListener listener) throws Exception;
    void stop() throws Exception;
}

LeaderRetrievalListener

Listener interface for receiving leader change notifications.

public interface LeaderRetrievalListener {
    void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
    void handleError(Exception exception);
}

Storage and Persistence Services

CheckpointRecoveryFactory

Factory for creating checkpoint recovery services that handle checkpoint metadata persistence.

public interface CheckpointRecoveryFactory {
    CompletedCheckpointStore createCompletedCheckpointStore(
        JobID jobId, 
        int maxNumberOfCheckpointsToRetain,
        ClassLoader userClassLoader
    ) throws Exception;
    
    CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
}

JobGraphStore

Persistent storage for job graphs to enable job recovery after failures.

public interface JobGraphStore {
    void putJobGraph(StoredJobGraph jobGraph) throws Exception;
    StoredJobGraph recoverJobGraph(JobID jobId) throws Exception;
    void removeJobGraph(JobID jobId) throws Exception;
    
    Collection<JobID> getJobIds() throws Exception;
    void start(JobGraphListener jobGraphListener) throws Exception;
    void stop() throws Exception;
}

JobResultStore

Storage for persisting job execution results and status information.

public interface JobResultStore {
    void createDirtyResult(JobResultEntry jobResultEntry) throws IOException;
    void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException;
    
    boolean hasJobResultEntry(JobID jobId) throws IOException;
    boolean hasDirtyJobResultEntry(JobID jobId) throws IOException;
    boolean hasCleanJobResultEntry(JobID jobId) throws IOException;
    
    Set<JobResult> getDirtyResults() throws IOException;
    Set<JobResult> getCleanResults() throws IOException;
}

Registry Services

RunningJobsRegistry

Registry for tracking which jobs are currently running in the cluster.

public interface RunningJobsRegistry {
    void setJobRunning(JobID jobID) throws IOException;
    void setJobFinished(JobID jobID) throws IOException;
    
    JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;
    
    enum JobSchedulingStatus {
        PENDING,
        RUNNING,
        DONE
    }
}

BlobStore

Distributed storage service for binary large objects (BLOBs) like JAR files and large state.

public interface BlobStore extends Closeable {
    boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
    boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
    boolean delete(JobID jobId, BlobKey blobKey);
    boolean deleteAll(JobID jobId);
    
    void closeAndCleanupAllData() throws IOException;
}

Utility Classes

HighAvailabilityServicesUtils

Utility class providing factory methods and helper functions for HA services.

public class HighAvailabilityServicesUtils {
    public static HighAvailabilityServices createAvailableOrEmbeddedServices(
        Configuration config, 
        Executor executor
    ) throws Exception;
    
    public static HighAvailabilityServices createHighAvailabilityServices(
        Configuration configuration,
        Executor executor,
        AddressResolution addressResolution
    ) throws Exception;
    
    public static String getJobManagerAddress(Configuration config) throws Exception;
    
    public static LeaderRetrievalService createLeaderRetrievalService(
        Configuration config,
        String serviceName
    ) throws Exception;
    
    public static LeaderElectionService createLeaderElectionService(
        Configuration config,
        String serviceName
    ) throws Exception;
    
    public static void setJobManagerAddress(Configuration config, String address, int port);
}

Configuration Options

High Availability Configuration Keys

Configuration options for setting up high availability services.

public class HighAvailabilityOptions {
    public static final ConfigOption<String> HA_MODE = 
        key("high-availability").defaultValue("NONE");
        
    public static final ConfigOption<String> HA_CLUSTER_ID = 
        key("high-availability.cluster-id").defaultValue("default");
        
    public static final ConfigOption<String> HA_STORAGE_PATH = 
        key("high-availability.storageDir").noDefaultValue();
        
    public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM = 
        key("high-availability.zookeeper.quorum").noDefaultValue();
        
    public static final ConfigOption<Integer> HA_ZOOKEEPER_SESSION_TIMEOUT = 
        key("high-availability.zookeeper.client.session-timeout").defaultValue(60000);
        
    public static final ConfigOption<Integer> HA_ZOOKEEPER_CONNECTION_TIMEOUT = 
        key("high-availability.zookeeper.client.connection-timeout").defaultValue(15000);
        
    public static final ConfigOption<Integer> HA_ZOOKEEPER_RETRY_WAIT = 
        key("high-availability.zookeeper.client.retry-wait").defaultValue(5000);
        
    public static final ConfigOption<Integer> HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 
        key("high-availability.zookeeper.client.max-retry-attempts").defaultValue(3);
        
    public static final ConfigOption<String> HA_ZOOKEEPER_ROOT = 
        key("high-availability.zookeeper.path.root").defaultValue("/flink");
}

Exception Handling

HighAvailabilityServicesException

Base exception for high availability service failures.

public class HighAvailabilityServicesException extends FlinkException {
    public HighAvailabilityServicesException(String message);
    public HighAvailabilityServicesException(String message, Throwable cause);
}

LeaderElectionException

Exception thrown during leader election process failures.

public class LeaderElectionException extends FlinkException {
    public LeaderElectionException(String message);
    public LeaderElectionException(String message, Throwable cause);
}

Usage Examples

Setting Up High Availability with ZooKeeper

import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;

// Configure ZooKeeper-based high availability
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "localhost:2181");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///ha-storage");
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "my-flink-cluster");

// Create HA services
HighAvailabilityServices haServices = HighAvailabilityServicesUtils
    .createHighAvailabilityServices(config, executor, AddressResolution.TRY_ADDRESS_RESOLUTION);

try {
    // Use HA services for cluster coordination
    LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
    LeaderRetrievalService rmLeaderRetrieval = haServices.getResourceManagerLeaderRetriever();
    
    // Set up leader election for Resource Manager
    ResourceManagerLeaderContender rmContender = new ResourceManagerLeaderContender();
    rmLeaderElection.start(rmContender);
    
    // Set up leader retrieval for clients
    ResourceManagerLeaderListener rmListener = new ResourceManagerLeaderListener();
    rmLeaderRetrieval.start(rmListener);
    
} finally {
    haServices.close();
}

Implementing a Leader Contender

import org.apache.flink.runtime.highavailability.LeaderContender;
import java.util.UUID;

public class ResourceManagerLeaderContender implements LeaderContender {
    private volatile boolean isLeader = false;
    private volatile UUID currentLeaderSessionId;
    private final String address;
    
    public ResourceManagerLeaderContender(String address) {
        this.address = address;
    }
    
    @Override
    public void grantLeadership(UUID leaderSessionID) {
        synchronized (this) {
            if (!isLeader) {
                System.out.println("Granted leadership with session ID: " + leaderSessionID);
                this.currentLeaderSessionId = leaderSessionID;
                this.isLeader = true;
                
                // Confirm leadership and start serving as leader
                confirmLeadership(leaderSessionID);
                startLeaderServices();
            }
        }
    }
    
    @Override
    public void revokeLeadership() {
        synchronized (this) {
            if (isLeader) {
                System.out.println("Leadership revoked");
                this.isLeader = false;
                this.currentLeaderSessionId = null;
                
                // Stop leader services
                stopLeaderServices();
            }
        }
    }
    
    @Override
    public String getAddress() {
        return address;
    }
    
    @Override
    public void handleError(Exception exception) {
        System.err.println("Leader election error: " + exception.getMessage());
        // Handle leadership errors - may need to restart election
        revokeLeadership();
    }
    
    private void confirmLeadership(UUID leaderSessionID) {
        // Confirm leadership with the election service
        leaderElectionService.confirmLeadership(leaderSessionID, address);
    }
    
    private void startLeaderServices() {
        // Initialize services that only the leader should run
        System.out.println("Starting Resource Manager leader services");
    }
    
    private void stopLeaderServices() {
        // Clean up leader-only services
        System.out.println("Stopping Resource Manager leader services");
    }
    
    public boolean hasLeadership() {
        return isLeader;
    }
    
    public UUID getCurrentLeaderSessionId() {
        return currentLeaderSessionId;
    }
}

Implementing a Leader Retrieval Listener

import org.apache.flink.runtime.highavailability.LeaderRetrievalListener;
import java.util.UUID;

public class ResourceManagerLeaderListener implements LeaderRetrievalListener {
    private volatile String currentLeaderAddress;
    private volatile UUID currentLeaderSessionId;
    
    @Override
    public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
        synchronized (this) {
            if (!Objects.equals(currentLeaderAddress, leaderAddress) || 
                !Objects.equals(currentLeaderSessionId, leaderSessionID)) {
                
                System.out.println("New Resource Manager leader: " + leaderAddress + 
                                 " (session: " + leaderSessionID + ")");
                
                // Update connection to new leader
                updateLeaderConnection(leaderAddress, leaderSessionID);
                
                this.currentLeaderAddress = leaderAddress;
                this.currentLeaderSessionId = leaderSessionID;
            }
        }
    }
    
    @Override
    public void handleError(Exception exception) {
        System.err.println("Leader retrieval error: " + exception.getMessage());
        
        // Clear current leader information
        synchronized (this) {
            this.currentLeaderAddress = null;
            this.currentLeaderSessionId = null;
            disconnectFromLeader();
        }
    }
    
    private void updateLeaderConnection(String leaderAddress, UUID leaderSessionId) {
        // Establish connection to the new leader
        if (leaderAddress != null) {
            System.out.println("Connecting to Resource Manager at: " + leaderAddress);
            // ... connect to leader
        } else {
            System.out.println("No Resource Manager leader available");
            disconnectFromLeader();
        }
    }
    
    private void disconnectFromLeader() {
        // Clean up connections to previous leader
        System.out.println("Disconnecting from Resource Manager leader");
    }
    
    public String getCurrentLeaderAddress() {
        return currentLeaderAddress;
    }
    
    public UUID getCurrentLeaderSessionId() {
        return currentLeaderSessionId;
    }
}

Job Graph Store Implementation

import org.apache.flink.runtime.highavailability.JobGraphStore;
import org.apache.flink.api.common.JobID;

public class FileSystemJobGraphStore implements JobGraphStore {
    private final Path storageDirectory;
    private volatile JobGraphListener listener;
    
    public FileSystemJobGraphStore(Path storageDirectory) {
        this.storageDirectory = storageDirectory;
    }
    
    @Override
    public void putJobGraph(StoredJobGraph jobGraph) throws Exception {
        JobID jobId = jobGraph.getJobId();
        Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
        
        // Serialize and store job graph
        try (ObjectOutputStream oos = new ObjectOutputStream(
                Files.newOutputStream(jobFile))) {
            oos.writeObject(jobGraph);
        }
        
        System.out.println("Stored job graph for: " + jobId);
        
        // Notify listener
        if (listener != null) {
            listener.onAddedJobGraph(jobId);
        }
    }
    
    @Override
    public StoredJobGraph recoverJobGraph(JobID jobId) throws Exception {
        Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
        
        if (!Files.exists(jobFile)) {
            throw new Exception("Job graph not found: " + jobId);
        }
        
        // Deserialize job graph
        try (ObjectInputStream ois = new ObjectInputStream(
                Files.newInputStream(jobFile))) {
            return (StoredJobGraph) ois.readObject();
        }
    }
    
    @Override
    public void removeJobGraph(JobID jobId) throws Exception {
        Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
        Files.deleteIfExists(jobFile);
        
        System.out.println("Removed job graph for: " + jobId);
        
        // Notify listener
        if (listener != null) {
            listener.onRemovedJobGraph(jobId);
        }
    }
    
    @Override
    public Collection<JobID> getJobIds() throws Exception {
        if (!Files.exists(storageDirectory)) {
            return Collections.emptyList();
        }
        
        List<JobID> jobIds = new ArrayList<>();
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(
                storageDirectory, "*.job")) {
            
            for (Path path : stream) {
                String filename = path.getFileName().toString();
                String jobIdStr = filename.substring(0, filename.lastIndexOf(".job"));
                jobIds.add(JobID.fromHexString(jobIdStr));
            }
        }
        
        return jobIds;
    }
    
    @Override
    public void start(JobGraphListener jobGraphListener) throws Exception {
        this.listener = jobGraphListener;
        
        // Ensure storage directory exists
        Files.createDirectories(storageDirectory);
        
        System.out.println("Started FileSystem job graph store at: " + storageDirectory);
    }
    
    @Override
    public void stop() throws Exception {
        this.listener = null;
        System.out.println("Stopped FileSystem job graph store");
    }
}

Checkpoint Recovery Factory

import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;

public class FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
    private final Path checkpointDirectory;
    private final Configuration configuration;
    
    public FileSystemCheckpointRecoveryFactory(Path checkpointDirectory, Configuration configuration) {
        this.checkpointDirectory = checkpointDirectory;
        this.configuration = configuration;
    }
    
    @Override
    public CompletedCheckpointStore createCompletedCheckpointStore(
            JobID jobId, 
            int maxNumberOfCheckpointsToRetain,
            ClassLoader userClassLoader) throws Exception {
        
        Path jobCheckpointDir = checkpointDirectory.resolve(jobId.toString());
        
        return new FileSystemCompletedCheckpointStore(
            jobCheckpointDir,
            maxNumberOfCheckpointsToRetain,
            userClassLoader
        );
    }
    
    @Override
    public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {
        Path counterFile = checkpointDirectory.resolve(jobId.toString()).resolve("counter");
        
        return new FileSystemCheckpointIDCounter(counterFile);
    }
}

Common Patterns

HA Services Lifecycle Management

public class ClusterManager {
    private HighAvailabilityServices haServices;
    private final Configuration configuration;
    
    public ClusterManager(Configuration configuration) {
        this.configuration = configuration;
    }
    
    public void start() throws Exception {
        // Initialize HA services
        haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
            configuration, 
            ForkJoinPool.commonPool(),
            AddressResolution.TRY_ADDRESS_RESOLUTION
        );
        
        // Start cluster components with HA support
        startResourceManager();
        startDispatcher();
        startWebMonitor();
    }
    
    public void stop() throws Exception {
        try {
            // Stop cluster components first
            stopWebMonitor();
            stopDispatcher();
            stopResourceManager();
        } finally {
            // Always cleanup HA services
            if (haServices != null) {
                haServices.closeAndCleanupAllData();
            }
        }
    }
    
    private void startResourceManager() throws Exception {
        LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
        ResourceManagerLeaderContender contender = new ResourceManagerLeaderContender();
        rmLeaderElection.start(contender);
    }
    
    // ... other component management methods
}

Robust Error Handling

public class HARobustComponent implements LeaderContender, LeaderRetrievalListener {
    private final ScheduledExecutorService scheduler;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    
    @Override
    public void handleError(Exception exception) {
        System.err.println("HA error occurred: " + exception.getMessage());
        
        // Implement exponential backoff retry
        scheduler.schedule(() -> {
            if (isRunning.get()) {
                try {
                    restartHAServices();
                } catch (Exception e) {
                    System.err.println("Failed to restart HA services: " + e.getMessage());
                    handleError(e); // Recursive retry with backoff
                }
            }
        }, calculateBackoffDelay(), TimeUnit.MILLISECONDS);
    }
    
    private void restartHAServices() throws Exception {
        // Restart HA service connections
        System.out.println("Restarting HA services");
        // ... restart logic
    }
    
    private long calculateBackoffDelay() {
        // Implement exponential backoff
        return Math.min(1000 * (long) Math.pow(2, retryCount), 30000);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10

docs

data-exchange.md

execution-graph.md

high-availability.md

index.md

job-management.md

message-passing.md

metrics.md

mini-cluster.md

rpc-framework.md

state-management.md

task-execution.md

tile.json