0
# High Availability Storage
1
2
Persistent storage interfaces for maintaining cluster state and worker information across framework restarts and failures. The HA system provides fault tolerance, state recovery, and consistent cluster management in production environments.
3
4
## Capabilities
5
6
### Worker Store Interface
7
8
Core interface for persistent storage of Mesos worker state and framework information, supporting both standalone and distributed storage implementations.
9
10
```java { .api }
11
/**
12
* Persistent store for Mesos worker state and framework information
13
* Provides fault tolerance and state recovery for high availability deployments
14
*/
15
public interface MesosWorkerStore {
16
/**
17
* Initialize the worker store and establish connections
18
* Must be called before any other operations
19
* @throws Exception if the worker store cannot be started
20
*/
21
void start() throws Exception;
22
23
/**
24
* Stop the worker store and cleanup resources
25
* @param cleanup - Whether to perform cleanup operations (remove stored data)
26
* @throws Exception if the worker store cannot be stopped properly
27
*/
28
void stop(boolean cleanup) throws Exception;
29
30
/**
31
* Get the stored Mesos framework ID for framework re-registration
32
* @return Optional framework ID, empty if none stored
33
* @throws Exception if the framework ID cannot be retrieved
34
*/
35
Option<Protos.FrameworkID> getFrameworkID() throws Exception;
36
37
/**
38
* Store the Mesos framework ID for persistent framework identity
39
* @param frameworkID - Framework ID to store, or empty to clear
40
* @throws Exception if the framework ID cannot be stored
41
*/
42
void setFrameworkID(Option<Protos.FrameworkID> frameworkID) throws Exception;
43
44
/**
45
* Recover all stored worker information after restart
46
* Used during framework recovery to restore cluster state
47
* @return List of all stored workers with their current state
48
* @throws Exception if worker information cannot be recovered
49
*/
50
List<Worker> recoverWorkers() throws Exception;
51
52
/**
53
* Generate a new unique task ID for worker identification
54
* Ensures task ID uniqueness across framework restarts
55
* @return New unique Mesos task ID
56
* @throws Exception if a new task ID cannot be generated
57
*/
58
Protos.TaskID newTaskID() throws Exception;
59
60
/**
61
* Store worker information persistently
62
* Updates existing worker if task ID already exists
63
* @param worker - Worker information to store
64
* @throws Exception if the worker cannot be stored
65
*/
66
void putWorker(Worker worker) throws Exception;
67
68
/**
69
* Remove worker from persistent storage
70
* @param taskID - Task ID of worker to remove
71
* @return true if worker was found and removed, false otherwise
72
* @throws Exception if the worker cannot be removed
73
*/
74
boolean removeWorker(Protos.TaskID taskID) throws Exception;
75
}
76
```
77
78
### Worker State Management
79
80
Nested classes within MesosWorkerStore for representing stored worker information and lifecycle states.
81
82
```java { .api }
83
/**
84
* Stored worker representation with state and launch information
85
* Contains all information needed to recover worker after framework restart
86
*/
87
public static class Worker implements Serializable {
88
/**
89
* Create worker entry for storage
90
* @param taskId - Unique Mesos task ID
91
* @param launchableWorker - Worker launch specification
92
* @param state - Current worker lifecycle state
93
*/
94
public Worker(Protos.TaskID taskId,
95
LaunchableMesosWorker launchableWorker,
96
WorkerState state);
97
98
/**
99
* Get the Mesos task ID for this worker
100
* @return Unique task identifier
101
*/
102
public Protos.TaskID taskID();
103
104
/**
105
* Get the launchable worker specification
106
* @return Worker launch configuration and requirements
107
*/
108
public LaunchableMesosWorker launchableMesosWorker();
109
110
/**
111
* Get the current worker lifecycle state
112
* @return Current state in worker lifecycle
113
*/
114
public WorkerState state();
115
116
/**
117
* Create a new worker with updated state
118
* @param newState - New lifecycle state
119
* @return New Worker instance with updated state
120
*/
121
public Worker withState(WorkerState newState);
122
}
123
124
/**
125
* Worker lifecycle states for state machine management
126
*/
127
public enum WorkerState {
128
/** Worker created but not yet launched */
129
New,
130
/** Worker successfully launched on Mesos slave */
131
Launched,
132
/** Worker released and no longer active */
133
Released
134
}
135
```
136
137
**Worker State Management Example:**
138
139
```java
140
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
141
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore.Worker;
142
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore.WorkerState;
143
144
// Store new worker
145
MesosWorkerStore store = /* ... */;
146
Protos.TaskID taskId = store.newTaskID();
147
LaunchableMesosWorker launchableWorker = /* ... */;
148
149
Worker newWorker = new Worker(taskId, launchableWorker, WorkerState.New);
150
store.putWorker(newWorker);
151
152
// Update worker state after successful launch
153
Worker launchedWorker = newWorker.withState(WorkerState.Launched);
154
store.putWorker(launchedWorker);
155
156
// Remove worker when no longer needed
157
store.removeWorker(taskId);
158
```
159
160
### Standalone Worker Store
161
162
In-memory implementation of MesosWorkerStore suitable for single-node deployments and development environments.
163
164
```java { .api }
165
/**
166
* In-memory implementation of MesosWorkerStore for standalone deployments
167
* Data is not persisted across process restarts - suitable for development only
168
*/
169
public class StandaloneMesosWorkerStore implements MesosWorkerStore {
170
/**
171
* Create standalone worker store with configuration
172
* @param config - Flink configuration (unused in standalone mode)
173
*/
174
public StandaloneMesosWorkerStore(Configuration config);
175
176
// Implements all MesosWorkerStore interface methods
177
// Data stored in memory only - lost on restart
178
}
179
```
180
181
### ZooKeeper Worker Store
182
183
Distributed implementation of MesosWorkerStore using Apache ZooKeeper for persistent, fault-tolerant storage in production environments.
184
185
```java { .api }
186
/**
187
* ZooKeeper-based implementation of MesosWorkerStore for high availability
188
* Provides persistent storage with automatic failover and consistency guarantees
189
*/
190
public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
191
/**
192
* Create ZooKeeper-based worker store
193
* @param curatorFramework - ZooKeeper client framework
194
* @param configuration - Flink configuration with ZK settings
195
*/
196
public ZooKeeperMesosWorkerStore(CuratorFramework curatorFramework,
197
Configuration configuration);
198
199
// Implements all MesosWorkerStore interface methods
200
// Data persisted in ZooKeeper with automatic replication
201
}
202
```
203
204
**ZooKeeper Configuration Example:**
205
206
```java
207
import org.apache.flink.configuration.Configuration;
208
import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
209
import org.apache.curator.framework.CuratorFramework;
210
import org.apache.curator.framework.CuratorFrameworkFactory;
211
212
// Configure ZooKeeper connection
213
Configuration config = new Configuration();
214
config.setString("high-availability", "zookeeper");
215
config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
216
config.setString("high-availability.zookeeper.path.root", "/flink");
217
config.setString("high-availability.cluster-id", "production-cluster-1");
218
219
// Create ZooKeeper client
220
CuratorFramework curator = CuratorFrameworkFactory.newClient(
221
"zk1:2181,zk2:2181,zk3:2181",
222
new ExponentialBackoffRetry(1000, 3)
223
);
224
curator.start();
225
226
// Create HA worker store
227
ZooKeeperMesosWorkerStore store = new ZooKeeperMesosWorkerStore(curator, config);
228
store.start();
229
```
230
231
## High Availability Patterns
232
233
### Framework Recovery
234
235
Complete framework state recovery after master restart or failover:
236
237
```java
238
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
239
240
// Framework recovery procedure
241
public void recoverFrameworkState(MesosWorkerStore store) {
242
// Recover framework ID for re-registration
243
Option<Protos.FrameworkID> frameworkId = store.getFrameworkID();
244
245
if (frameworkId.isDefined()) {
246
// Re-register with existing framework ID
247
reregisterFramework(frameworkId.get());
248
} else {
249
// Fresh registration - first time startup
250
registerNewFramework();
251
}
252
253
// Recover all workers and their states
254
List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
255
256
for (MesosWorkerStore.Worker worker : workers) {
257
switch (worker.state()) {
258
case New:
259
// Worker was created but never launched - retry launch
260
retryWorkerLaunch(worker);
261
break;
262
case Launched:
263
// Worker was launched - verify status and reconcile
264
reconcileWorkerState(worker);
265
break;
266
case Released:
267
// Worker was released - clean up if needed
268
cleanupReleasedWorker(worker);
269
break;
270
}
271
}
272
}
273
```
274
275
### State Synchronization
276
277
Consistent state management across framework components:
278
279
```java
280
// Synchronized worker lifecycle management
281
public class WorkerLifecycleManager {
282
private final MesosWorkerStore store;
283
284
public void launchWorker(LaunchableMesosWorker launchableWorker) {
285
// Create task ID and store initial state
286
Protos.TaskID taskId = store.newTaskID();
287
MesosWorkerStore.Worker worker = new MesosWorkerStore.Worker(
288
taskId, launchableWorker, WorkerState.New
289
);
290
store.putWorker(worker);
291
292
try {
293
// Attempt to launch worker
294
launchWorkerOnMesos(launchableWorker);
295
296
// Update state to launched on success
297
store.putWorker(worker.withState(WorkerState.Launched));
298
299
} catch (Exception e) {
300
// Remove worker on launch failure
301
store.removeWorker(taskId);
302
throw e;
303
}
304
}
305
306
public void releaseWorker(Protos.TaskID taskId) {
307
// Get current worker state
308
List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
309
MesosWorkerStore.Worker worker = findWorkerById(workers, taskId);
310
311
if (worker != null) {
312
// Update state to released
313
store.putWorker(worker.withState(WorkerState.Released));
314
315
// Perform cleanup after state update
316
cleanupWorkerResources(worker);
317
318
// Remove from store after successful cleanup
319
store.removeWorker(taskId);
320
}
321
}
322
}
323
```
324
325
### Backup and Restore
326
327
Data backup strategies for disaster recovery:
328
329
```java
330
// Backup framework state
331
public void backupFrameworkState(MesosWorkerStore store, String backupLocation) {
332
// Get all persistent state
333
Option<Protos.FrameworkID> frameworkId = store.getFrameworkID();
334
List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
335
336
// Create backup data structure
337
FrameworkBackup backup = new FrameworkBackup(frameworkId, workers, System.currentTimeMillis());
338
339
// Serialize and store backup
340
writeBackupToStorage(backup, backupLocation);
341
}
342
343
// Restore framework state from backup
344
public void restoreFrameworkState(MesosWorkerStore store, String backupLocation) {
345
// Load backup data
346
FrameworkBackup backup = readBackupFromStorage(backupLocation);
347
348
// Restore framework ID
349
if (backup.getFrameworkId().isDefined()) {
350
store.setFrameworkID(backup.getFrameworkId());
351
}
352
353
// Restore worker states
354
for (MesosWorkerStore.Worker worker : backup.getWorkers()) {
355
store.putWorker(worker);
356
}
357
}
358
```
359
360
## Error Handling and Recovery
361
362
### Connection Failures
363
364
Robust handling of storage backend connection failures:
365
366
- **Automatic retry**: Exponential backoff for transient failures
367
- **Circuit breaker**: Prevent cascade failures during outages
368
- **Graceful degradation**: Continue operation with reduced functionality
369
- **Health monitoring**: Continuous monitoring of storage backend health
370
371
### Data Consistency
372
373
Ensuring consistent state across distributed components:
374
375
- **Atomic operations**: All-or-nothing state updates
376
- **Conflict resolution**: Handling concurrent updates from multiple instances
377
- **Version control**: Optimistic concurrency control for state updates
378
- **Consistency checks**: Periodic validation of stored state integrity
379
380
### Split-Brain Prevention
381
382
Protection against split-brain scenarios in distributed deployments:
383
384
- **Leader election**: Single active resource manager instance
385
- **Fencing mechanisms**: Prevent zombie processes from corrupting state
386
- **Quorum requirements**: Majority consensus for critical operations
387
- **Timeout handling**: Appropriate timeouts for distributed operations
388
389
## Performance Optimization
390
391
### Batch Operations
392
393
Efficient handling of bulk state operations:
394
395
```java
396
// Batch worker state updates
397
public void updateWorkerStates(Map<Protos.TaskID, WorkerState> stateUpdates) {
398
List<MesosWorkerStore.Worker> workers = store.recoverWorkers();
399
400
// Batch update all workers
401
for (Map.Entry<Protos.TaskID, WorkerState> entry : stateUpdates.entrySet()) {
402
MesosWorkerStore.Worker worker = findWorkerById(workers, entry.getKey());
403
if (worker != null) {
404
store.putWorker(worker.withState(entry.getValue()));
405
}
406
}
407
}
408
```
409
410
### Connection Pooling
411
412
Optimized connections to storage backends:
413
414
- **Connection reuse**: Pool connections for ZooKeeper operations
415
- **Session management**: Persistent sessions with automatic renewal
416
- **Connection monitoring**: Health checking and automatic reconnection
417
- **Resource cleanup**: Proper cleanup of connections and sessions
418
419
## Monitoring and Observability
420
421
### Metrics and Monitoring
422
423
Key metrics for HA storage monitoring:
424
425
- **Storage latency**: Response times for read/write operations
426
- **Connection health**: Status of storage backend connections
427
- **State consistency**: Validation of stored state integrity
428
- **Recovery metrics**: Time to recover from failures
429
430
### Alerting
431
432
Critical alerts for HA storage systems:
433
434
- **Storage backend failures**: Connection losses or timeouts
435
- **State corruption**: Inconsistent or invalid stored state
436
- **Recovery failures**: Problems during framework recovery
437
- **Resource exhaustion**: Storage space or connection limits
438
439
## Deprecation Notice
440
441
All high availability storage classes are deprecated as of Flink 1.13. Migration paths:
442
443
- **Kubernetes**: Use Kubernetes ConfigMaps/Secrets for state storage
444
- **YARN**: Use YARN's resource manager state store
445
- **Standalone**: Use Flink's built-in HA storage mechanisms
446
447
## Types
448
449
```java { .api }
450
/**
451
* Framework backup data structure
452
*/
453
public class FrameworkBackup implements Serializable {
454
public Option<Protos.FrameworkID> getFrameworkId();
455
public List<MesosWorkerStore.Worker> getWorkers();
456
public long getTimestamp();
457
public String getVersion();
458
}
459
460
/**
461
* Storage configuration for HA deployments
462
*/
463
public class HAStorageConfiguration {
464
public String getStorageType(); // "standalone" or "zookeeper"
465
public String getZooKeeperQuorum();
466
public String getStoragePath();
467
public int getConnectionTimeout();
468
public int getSessionTimeout();
469
}
470
471
/**
472
* Worker recovery information
473
*/
474
public class WorkerRecoveryInfo {
475
public Protos.TaskID getTaskId();
476
public WorkerState getLastKnownState();
477
public long getLastUpdateTime();
478
public boolean requiresReconciliation();
479
}
480
```