Persistent storage interfaces for maintaining cluster state and worker information across framework restarts and failures. The HA system provides fault tolerance, state recovery, and consistent cluster management in production environments.
Core interface for persistent storage of Mesos worker state and framework information, supporting both standalone and distributed storage implementations.
/**
* Persistent store for Mesos worker state and framework information
* Provides fault tolerance and state recovery for high availability deployments
*/
public interface MesosWorkerStore {
/**
* Initialize the worker store and establish connections
* Must be called before any other operations
* @throws Exception if the worker store cannot be started
*/
void start() throws Exception;
/**
* Stop the worker store and cleanup resources
* @param cleanup - Whether to perform cleanup operations (remove stored data)
* @throws Exception if the worker store cannot be stopped properly
*/
void stop(boolean cleanup) throws Exception;
/**
* Get the stored Mesos framework ID for framework re-registration
* @return Optional framework ID, empty if none stored
* @throws Exception if the framework ID cannot be retrieved
*/
Option<Protos.FrameworkID> getFrameworkID() throws Exception;
/**
* Store the Mesos framework ID for persistent framework identity
* @param frameworkID - Framework ID to store, or empty to clear
* @throws Exception if the framework ID cannot be stored
*/
void setFrameworkID(Option<Protos.FrameworkID> frameworkID) throws Exception;
/**
* Recover all stored worker information after restart
* Used during framework recovery to restore cluster state
* @return List of all stored workers with their current state
* @throws Exception if worker information cannot be recovered
*/
List<Worker> recoverWorkers() throws Exception;
/**
* Generate a new unique task ID for worker identification
* Ensures task ID uniqueness across framework restarts
* @return New unique Mesos task ID
* @throws Exception if a new task ID cannot be generated
*/
Protos.TaskID newTaskID() throws Exception;
/**
* Store worker information persistently
* Updates existing worker if task ID already exists
* @param worker - Worker information to store
* @throws Exception if the worker cannot be stored
*/
void putWorker(Worker worker) throws Exception;
/**
* Remove worker from persistent storage
* @param taskID - Task ID of worker to remove
* @return true if worker was found and removed, false otherwise
* @throws Exception if the worker cannot be removed
*/
boolean removeWorker(Protos.TaskID taskID) throws Exception;
}Nested classes within MesosWorkerStore for representing stored worker information and lifecycle states.
/**
* Stored worker representation with state and launch information
* Contains all information needed to recover worker after framework restart
*/
public static class Worker implements Serializable {
/**
* Create worker entry for storage
* @param taskId - Unique Mesos task ID
* @param launchableWorker - Worker launch specification
* @param state - Current worker lifecycle state
*/
public Worker(Protos.TaskID taskId,
LaunchableMesosWorker launchableWorker,
WorkerState state);
/**
* Get the Mesos task ID for this worker
* @return Unique task identifier
*/
public Protos.TaskID taskID();
/**
* Get the launchable worker specification
* @return Worker launch configuration and requirements
*/
public LaunchableMesosWorker launchableMesosWorker();
/**
* Get the current worker lifecycle state
* @return Current state in worker lifecycle
*/
public WorkerState state();
/**
* Create a new worker with updated state
* @param newState - New lifecycle state
* @return New Worker instance with updated state
*/
public Worker withState(WorkerState newState);
}
/**
* Worker lifecycle states for state machine management
*/
public enum WorkerState {
/** Worker created but not yet launched */
New,
/** Worker successfully launched on Mesos slave */
Launched,
/** Worker released and no longer active */
Released
}Worker State Management Example:
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore.Worker;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore.WorkerState;
// Store new worker
MesosWorkerStore store = /* ... */;
Protos.TaskID taskId = store.newTaskID();
LaunchableMesosWorker launchableWorker = /* ... */;
Worker newWorker = new Worker(taskId, launchableWorker, WorkerState.New);
store.putWorker(newWorker);
// Update worker state after successful launch
Worker launchedWorker = newWorker.withState(WorkerState.Launched);
store.putWorker(launchedWorker);
// Remove worker when no longer needed
store.removeWorker(taskId);In-memory implementation of MesosWorkerStore suitable for single-node deployments and development environments.
/**
* In-memory implementation of MesosWorkerStore for standalone deployments
* Data is not persisted across process restarts - suitable for development only
*/
public class StandaloneMesosWorkerStore implements MesosWorkerStore {
/**
* Create standalone worker store with configuration
* @param config - Flink configuration (unused in standalone mode)
*/
public StandaloneMesosWorkerStore(Configuration config);
// Implements all MesosWorkerStore interface methods
// Data stored in memory only - lost on restart
}Distributed implementation of MesosWorkerStore using Apache ZooKeeper for persistent, fault-tolerant storage in production environments.
/**
* ZooKeeper-based implementation of MesosWorkerStore for high availability
* Provides persistent storage with automatic failover and consistency guarantees
*/
public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
/**
* Create ZooKeeper-based worker store
* @param curatorFramework - ZooKeeper client framework
* @param configuration - Flink configuration with ZK settings
*/
public ZooKeeperMesosWorkerStore(CuratorFramework curatorFramework,
Configuration configuration);
// Implements all MesosWorkerStore interface methods
// Data persisted in ZooKeeper with automatic replication
}ZooKeeper Configuration Example:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
// Configure ZooKeeper connection
Configuration config = new Configuration();
config.setString("high-availability", "zookeeper");
config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
config.setString("high-availability.zookeeper.path.root", "/flink");
config.setString("high-availability.cluster-id", "production-cluster-1");
// Create ZooKeeper client
CuratorFramework curator = CuratorFrameworkFactory.newClient(
"zk1:2181,zk2:2181,zk3:2181",
new ExponentialBackoffRetry(1000, 3)
);
curator.start();
// Create HA worker store
ZooKeeperMesosWorkerStore store = new ZooKeeperMesosWorkerStore(curator, config);
store.start();Complete framework state recovery after master restart or failover:
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
// Framework recovery procedure
public void recoverFrameworkState(MesosWorkerStore store) {
// Recover framework ID for re-registration
Option<Protos.FrameworkID> frameworkId = store.getFrameworkID();
if (frameworkId.isDefined()) {
// Re-register with existing framework ID
reregisterFramework(frameworkId.get());
} else {
// Fresh registration - first time startup
registerNewFramework();
}
// Recover all workers and their states
List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
for (MesosWorkerStore.Worker worker : workers) {
switch (worker.state()) {
case New:
// Worker was created but never launched - retry launch
retryWorkerLaunch(worker);
break;
case Launched:
// Worker was launched - verify status and reconcile
reconcileWorkerState(worker);
break;
case Released:
// Worker was released - clean up if needed
cleanupReleasedWorker(worker);
break;
}
}
}Consistent state management across framework components:
// Synchronized worker lifecycle management
public class WorkerLifecycleManager {
private final MesosWorkerStore store;
public void launchWorker(LaunchableMesosWorker launchableWorker) {
// Create task ID and store initial state
Protos.TaskID taskId = store.newTaskID();
MesosWorkerStore.Worker worker = new MesosWorkerStore.Worker(
taskId, launchableWorker, WorkerState.New
);
store.putWorker(worker);
try {
// Attempt to launch worker
launchWorkerOnMesos(launchableWorker);
// Update state to launched on success
store.putWorker(worker.withState(WorkerState.Launched));
} catch (Exception e) {
// Remove worker on launch failure
store.removeWorker(taskId);
throw e;
}
}
public void releaseWorker(Protos.TaskID taskId) {
// Get current worker state
List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
MesosWorkerStore.Worker worker = findWorkerById(workers, taskId);
if (worker != null) {
// Update state to released
store.putWorker(worker.withState(WorkerState.Released));
// Perform cleanup after state update
cleanupWorkerResources(worker);
// Remove from store after successful cleanup
store.removeWorker(taskId);
}
}
}Data backup strategies for disaster recovery:
// Backup framework state
public void backupFrameworkState(MesosWorkerStore store, String backupLocation) {
// Get all persistent state
Option<Protos.FrameworkID> frameworkId = store.getFrameworkID();
List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
// Create backup data structure
FrameworkBackup backup = new FrameworkBackup(frameworkId, workers, System.currentTimeMillis());
// Serialize and store backup
writeBackupToStorage(backup, backupLocation);
}
// Restore framework state from backup
public void restoreFrameworkState(MesosWorkerStore store, String backupLocation) {
// Load backup data
FrameworkBackup backup = readBackupFromStorage(backupLocation);
// Restore framework ID
if (backup.getFrameworkId().isDefined()) {
store.setFrameworkID(backup.getFrameworkId());
}
// Restore worker states
for (MesosWorkerStore.Worker worker : backup.getWorkers()) {
store.putWorker(worker);
}
}Robust handling of storage backend connection failures:
Ensuring consistent state across distributed components:
Protection against split-brain scenarios in distributed deployments:
Efficient handling of bulk state operations:
// Batch worker state updates
public void updateWorkerStates(Map<Protos.TaskID, WorkerState> stateUpdates) {
List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
// Batch update all workers
for (Map.Entry<Protos.TaskID, WorkerState> entry : stateUpdates.entrySet()) {
MesosWorkerStore.Worker worker = findWorkerById(workers, entry.getKey());
if (worker != null) {
store.putWorker(worker.withState(entry.getValue()));
}
}
}Optimized connections to storage backends:
Key metrics for HA storage monitoring:
Critical alerts for HA storage systems:
All high availability storage classes are deprecated as of Flink 1.13. Migration paths:
/**
* Framework backup data structure
*/
public class FrameworkBackup implements Serializable {
public Option<Protos.FrameworkID> getFrameworkId();
public List<MesosWorkerStore.Worker> getWorkers();
public long getTimestamp();
public String getVersion();
}
/**
* Storage configuration for HA deployments
*/
public class HAStorageConfiguration {
public String getStorageType(); // "standalone" or "zookeeper"
public String getZooKeeperQuorum();
public String getStoragePath();
public int getConnectionTimeout();
public int getSessionTimeout();
}
/**
* Worker recovery information
*/
public class WorkerRecoveryInfo {
public Protos.TaskID getTaskId();
public WorkerState getLastKnownState();
public long getLastUpdateTime();
public boolean requiresReconciliation();
}