or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdentry-points.mdhigh-availability.mdindex.mdresource-management.mdtask-scheduling.mdutilities.md
tile.json

high-availability.mddocs/

High Availability Storage

Persistent storage interfaces for maintaining cluster state and worker information across framework restarts and failures. The HA system provides fault tolerance, state recovery, and consistent cluster management in production environments.

Capabilities

Worker Store Interface

Core interface for persistent storage of Mesos worker state and framework information, supporting both standalone and distributed storage implementations.

/**
 * Persistent store for Mesos worker state and framework information
 * Provides fault tolerance and state recovery for high availability deployments
 */
public interface MesosWorkerStore {
    /**
     * Initialize the worker store and establish connections
     * Must be called before any other operations
     * @throws Exception if the worker store cannot be started
     */
    void start() throws Exception;
    
    /**
     * Stop the worker store and cleanup resources
     * @param cleanup - Whether to perform cleanup operations (remove stored data)
     * @throws Exception if the worker store cannot be stopped properly
     */
    void stop(boolean cleanup) throws Exception;
    
    /**
     * Get the stored Mesos framework ID for framework re-registration
     * @return Optional framework ID, empty if none stored
     * @throws Exception if the framework ID cannot be retrieved
     */
    Option<Protos.FrameworkID> getFrameworkID() throws Exception;
    
    /**
     * Store the Mesos framework ID for persistent framework identity
     * @param frameworkID - Framework ID to store, or empty to clear
     * @throws Exception if the framework ID cannot be stored
     */
    void setFrameworkID(Option<Protos.FrameworkID> frameworkID) throws Exception;
    
    /**
     * Recover all stored worker information after restart
     * Used during framework recovery to restore cluster state
     * @return List of all stored workers with their current state
     * @throws Exception if worker information cannot be recovered
     */
    List<Worker> recoverWorkers() throws Exception;
    
    /**
     * Generate a new unique task ID for worker identification
     * Ensures task ID uniqueness across framework restarts
     * @return New unique Mesos task ID
     * @throws Exception if a new task ID cannot be generated
     */
    Protos.TaskID newTaskID() throws Exception;
    
    /**
     * Store worker information persistently
     * Updates existing worker if task ID already exists
     * @param worker - Worker information to store
     * @throws Exception if the worker cannot be stored
     */
    void putWorker(Worker worker) throws Exception;
    
    /**
     * Remove worker from persistent storage
     * @param taskID - Task ID of worker to remove
     * @return true if worker was found and removed, false otherwise
     * @throws Exception if the worker cannot be removed
     */
    boolean removeWorker(Protos.TaskID taskID) throws Exception;
}

Worker State Management

Nested classes within MesosWorkerStore for representing stored worker information and lifecycle states.

/**
 * Stored worker representation with state and launch information
 * Contains all information needed to recover worker after framework restart
 */
public static class Worker implements Serializable {
    /**
     * Create worker entry for storage
     * @param taskId - Unique Mesos task ID
     * @param launchableWorker - Worker launch specification
     * @param state - Current worker lifecycle state
     */
    public Worker(Protos.TaskID taskId, 
                  LaunchableMesosWorker launchableWorker, 
                  WorkerState state);
    
    /**
     * Get the Mesos task ID for this worker
     * @return Unique task identifier
     */
    public Protos.TaskID taskID();
    
    /**
     * Get the launchable worker specification
     * @return Worker launch configuration and requirements
     */
    public LaunchableMesosWorker launchableMesosWorker();
    
    /**
     * Get the current worker lifecycle state
     * @return Current state in worker lifecycle
     */
    public WorkerState state();
    
    /**
     * Create a new worker with updated state
     * @param newState - New lifecycle state
     * @return New Worker instance with updated state
     */
    public Worker withState(WorkerState newState);
}

/**
 * Worker lifecycle states for state machine management
 */
public enum WorkerState {
    /** Worker created but not yet launched */
    New,
    /** Worker successfully launched on Mesos slave */
    Launched,
    /** Worker released and no longer active */
    Released
}

Worker State Management Example:

import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore.Worker;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore.WorkerState;

// Store new worker
MesosWorkerStore store = /* ... */;
Protos.TaskID taskId = store.newTaskID();
LaunchableMesosWorker launchableWorker = /* ... */;

Worker newWorker = new Worker(taskId, launchableWorker, WorkerState.New);
store.putWorker(newWorker);

// Update worker state after successful launch
Worker launchedWorker = newWorker.withState(WorkerState.Launched);
store.putWorker(launchedWorker);

// Remove worker when no longer needed
store.removeWorker(taskId);

Standalone Worker Store

In-memory implementation of MesosWorkerStore suitable for single-node deployments and development environments.

/**
 * In-memory implementation of MesosWorkerStore for standalone deployments
 * Data is not persisted across process restarts - suitable for development only
 */
public class StandaloneMesosWorkerStore implements MesosWorkerStore {
    /**
     * Create standalone worker store with configuration
     * @param config - Flink configuration (unused in standalone mode)
     */
    public StandaloneMesosWorkerStore(Configuration config);
    
    // Implements all MesosWorkerStore interface methods
    // Data stored in memory only - lost on restart
}

ZooKeeper Worker Store

Distributed implementation of MesosWorkerStore using Apache ZooKeeper for persistent, fault-tolerant storage in production environments.

/**
 * ZooKeeper-based implementation of MesosWorkerStore for high availability
 * Provides persistent storage with automatic failover and consistency guarantees
 */
public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
    /**
     * Create ZooKeeper-based worker store
     * @param curatorFramework - ZooKeeper client framework
     * @param configuration - Flink configuration with ZK settings
     */
    public ZooKeeperMesosWorkerStore(CuratorFramework curatorFramework, 
                                     Configuration configuration);
    
    // Implements all MesosWorkerStore interface methods
    // Data persisted in ZooKeeper with automatic replication
}

ZooKeeper Configuration Example:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;

// Configure ZooKeeper connection
Configuration config = new Configuration();
config.setString("high-availability", "zookeeper");
config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
config.setString("high-availability.zookeeper.path.root", "/flink");
config.setString("high-availability.cluster-id", "production-cluster-1");

// Create ZooKeeper client
CuratorFramework curator = CuratorFrameworkFactory.newClient(
    "zk1:2181,zk2:2181,zk3:2181",
    new ExponentialBackoffRetry(1000, 3)
);
curator.start();

// Create HA worker store
ZooKeeperMesosWorkerStore store = new ZooKeeperMesosWorkerStore(curator, config);
store.start();

High Availability Patterns

Framework Recovery

Complete framework state recovery after master restart or failover:

import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;

// Framework recovery procedure
public void recoverFrameworkState(MesosWorkerStore store) {
    // Recover framework ID for re-registration
    Option<Protos.FrameworkID> frameworkId = store.getFrameworkID();
    
    if (frameworkId.isDefined()) {
        // Re-register with existing framework ID
        reregisterFramework(frameworkId.get());
    } else {
        // Fresh registration - first time startup
        registerNewFramework();
    }
    
    // Recover all workers and their states
    List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
    
    for (MesosWorkerStore.Worker worker : workers) {
        switch (worker.state()) {
            case New:
                // Worker was created but never launched - retry launch
                retryWorkerLaunch(worker);
                break;
            case Launched:
                // Worker was launched - verify status and reconcile
                reconcileWorkerState(worker);
                break;
            case Released:
                // Worker was released - clean up if needed
                cleanupReleasedWorker(worker);
                break;
        }
    }
}

State Synchronization

Consistent state management across framework components:

// Synchronized worker lifecycle management
public class WorkerLifecycleManager {
    private final MesosWorkerStore store;
    
    public void launchWorker(LaunchableMesosWorker launchableWorker) {
        // Create task ID and store initial state
        Protos.TaskID taskId = store.newTaskID();
        MesosWorkerStore.Worker worker = new MesosWorkerStore.Worker(
            taskId, launchableWorker, WorkerState.New
        );
        store.putWorker(worker);
        
        try {
            // Attempt to launch worker
            launchWorkerOnMesos(launchableWorker);
            
            // Update state to launched on success
            store.putWorker(worker.withState(WorkerState.Launched));
            
        } catch (Exception e) {
            // Remove worker on launch failure
            store.removeWorker(taskId);
            throw e;
        }
    }
    
    public void releaseWorker(Protos.TaskID taskId) {
        // Get current worker state
        List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
        MesosWorkerStore.Worker worker = findWorkerById(workers, taskId);
        
        if (worker != null) {
            // Update state to released
            store.putWorker(worker.withState(WorkerState.Released));
            
            // Perform cleanup after state update
            cleanupWorkerResources(worker);
            
            // Remove from store after successful cleanup
            store.removeWorker(taskId);
        }
    }
}

Backup and Restore

Data backup strategies for disaster recovery:

// Backup framework state
public void backupFrameworkState(MesosWorkerStore store, String backupLocation) {
    // Get all persistent state
    Option<Protos.FrameworkID> frameworkId = store.getFrameworkID();
    List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
    
    // Create backup data structure
    FrameworkBackup backup = new FrameworkBackup(frameworkId, workers, System.currentTimeMillis());
    
    // Serialize and store backup
    writeBackupToStorage(backup, backupLocation);
}

// Restore framework state from backup
public void restoreFrameworkState(MesosWorkerStore store, String backupLocation) {
    // Load backup data
    FrameworkBackup backup = readBackupFromStorage(backupLocation);
    
    // Restore framework ID
    if (backup.getFrameworkId().isDefined()) {
        store.setFrameworkID(backup.getFrameworkId());
    }
    
    // Restore worker states
    for (MesosWorkerStore.Worker worker : backup.getWorkers()) {
        store.putWorker(worker);
    }
}

Error Handling and Recovery

Connection Failures

Robust handling of storage backend connection failures:

  • Automatic retry: Exponential backoff for transient failures
  • Circuit breaker: Prevent cascade failures during outages
  • Graceful degradation: Continue operation with reduced functionality
  • Health monitoring: Continuous monitoring of storage backend health

Data Consistency

Ensuring consistent state across distributed components:

  • Atomic operations: All-or-nothing state updates
  • Conflict resolution: Handling concurrent updates from multiple instances
  • Version control: Optimistic concurrency control for state updates
  • Consistency checks: Periodic validation of stored state integrity

Split-Brain Prevention

Protection against split-brain scenarios in distributed deployments:

  • Leader election: Single active resource manager instance
  • Fencing mechanisms: Prevent zombie processes from corrupting state
  • Quorum requirements: Majority consensus for critical operations
  • Timeout handling: Appropriate timeouts for distributed operations

Performance Optimization

Batch Operations

Efficient handling of bulk state operations:

// Batch worker state updates
public void updateWorkerStates(Map<Protos.TaskID, WorkerState> stateUpdates) {
    List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
    
    // Batch update all workers
    for (Map.Entry<Protos.TaskID, WorkerState> entry : stateUpdates.entrySet()) {
        MesosWorkerStore.Worker worker = findWorkerById(workers, entry.getKey());
        if (worker != null) {
            store.putWorker(worker.withState(entry.getValue()));
        }
    }
}

Connection Pooling

Optimized connections to storage backends:

  • Connection reuse: Pool connections for ZooKeeper operations
  • Session management: Persistent sessions with automatic renewal
  • Connection monitoring: Health checking and automatic reconnection
  • Resource cleanup: Proper cleanup of connections and sessions

Monitoring and Observability

Metrics and Monitoring

Key metrics for HA storage monitoring:

  • Storage latency: Response times for read/write operations
  • Connection health: Status of storage backend connections
  • State consistency: Validation of stored state integrity
  • Recovery metrics: Time to recover from failures

Alerting

Critical alerts for HA storage systems:

  • Storage backend failures: Connection losses or timeouts
  • State corruption: Inconsistent or invalid stored state
  • Recovery failures: Problems during framework recovery
  • Resource exhaustion: Storage space or connection limits

Deprecation Notice

All high availability storage classes are deprecated as of Flink 1.13. Migration paths:

  • Kubernetes: Use Kubernetes ConfigMaps/Secrets for state storage
  • YARN: Use YARN's resource manager state store
  • Standalone: Use Flink's built-in HA storage mechanisms

Types

/**
 * Framework backup data structure
 */
public class FrameworkBackup implements Serializable {
    public Option<Protos.FrameworkID> getFrameworkId();
    public List<MesosWorkerStore.Worker> getWorkers();
    public long getTimestamp();
    public String getVersion();
}

/**
 * Storage configuration for HA deployments
 */
public class HAStorageConfiguration {
    public String getStorageType(); // "standalone" or "zookeeper"
    public String getZooKeeperQuorum();
    public String getStoragePath();
    public int getConnectionTimeout();
    public int getSessionTimeout();
}

/**
 * Worker recovery information
 */
public class WorkerRecoveryInfo {
    public Protos.TaskID getTaskId();
    public WorkerState getLastKnownState();
    public long getLastUpdateTime();
    public boolean requiresReconciliation();
}