0
# High Availability and Coordination
1
2
Leader election, service discovery, and coordination services for fault-tolerant distributed operation. Flink's high availability system ensures cluster resilience and automatic recovery from failures.
3
4
## Capabilities
5
6
### HighAvailabilityServices
7
8
Central service providing high availability components for leader election, service discovery, and coordination.
9
10
```java { .api }
11
/**
12
* The HighAvailabilityServices provide access to all services needed for a highly-available
13
* setup. In particular, they provide access to highly-available variants of the following services:
14
* - ResourceManager leader election and leader retrieval
15
* - Dispatcher leader election and leader retrieval
16
* - JobManager leader election and leader retrieval
17
* - Checkpointing metadata persistence
18
*/
19
public interface HighAvailabilityServices extends AutoCloseableAsync {
20
/** Get ResourceManager leader election service */
21
LeaderElectionService getResourceManagerLeaderElectionService();
22
23
/** Get Dispatcher leader election service */
24
LeaderElectionService getDispatcherLeaderElectionService();
25
26
/** Get JobManager leader election service for specific job */
27
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
28
29
/** Get ResourceManager leader retrieval service */
30
LeaderRetrievalService getResourceManagerLeaderRetriever();
31
32
/** Get Dispatcher leader retrieval service */
33
LeaderRetrievalService getDispatcherLeaderRetriever();
34
35
/** Get JobManager leader retrieval service for specific job */
36
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
37
38
/** Get JobManager leader retrieval service with fallback */
39
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
40
41
/** Get web monitor leader retrieval service */
42
LeaderRetrievalService getWebMonitorLeaderRetriever();
43
44
/** Get cluster rest endpoint leader retrieval service */
45
LeaderRetrievalService getClusterRestEndpointLeaderRetriever();
46
47
/** Get checkpoint recovery factory */
48
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
49
50
/** Get job graph store */
51
JobGraphStore getJobGraphStore();
52
53
/** Get job result store */
54
JobResultStore getJobResultStore();
55
56
/** Get running jobs registry */
57
RunningJobsRegistry getRunningJobsRegistry();
58
59
/** Get blob store service */
60
BlobStoreService createBlobStore();
61
62
/** Close services asynchronously */
63
CompletableFuture<Void> closeAsync();
64
65
/** Close and cleanup all HA data */
66
CompletableFuture<Void> closeAndCleanupAllData();
67
}
68
```
69
70
**Usage Examples:**
71
72
```java
73
// Create HA services from configuration
74
Configuration config = new Configuration();
75
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
76
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181,zk3:2181");
77
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "my-flink-cluster");
78
79
HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
80
config,
81
ioExecutor,
82
AddressResolution.TRY_ADDRESS_RESOLUTION,
83
rpcSystem,
84
fatalErrorHandler
85
);
86
87
// Get ResourceManager leader election service
88
LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
89
rmLeaderElection.start(new ResourceManagerLeaderContender());
90
91
// Get JobManager leader retrieval for monitoring
92
LeaderRetrievalService jmLeaderRetrieval = haServices.getJobManagerLeaderRetriever(jobId);
93
jmLeaderRetrieval.start(new JobManagerLeaderListener());
94
95
// Get checkpoint recovery factory
96
CheckpointRecoveryFactory checkpointRecovery = haServices.getCheckpointRecoveryFactory();
97
CompletedCheckpointStore checkpointStore = checkpointRecovery.createRecoveredCompletedCheckpointStore(
98
jobId,
99
maxNumberOfCheckpointsToRetain,
100
sharedStateRegistryFactory,
101
ioExecutor
102
);
103
```
104
105
### LeaderElectionService
106
107
Service for conducting leader elections among multiple candidates, ensuring single leadership.
108
109
```java { .api }
110
/**
111
* Interface for a service which allows to elect a leader among a group of contenders.
112
* Prior to using this service, it has to be started by calling the start method.
113
* The start method takes the contender as an argument. If there are multiple contenders,
114
* then each one has to call the start method with its own contender object.
115
*/
116
public interface LeaderElectionService {
117
/** Start the leader election service with a contender */
118
void start(LeaderContender contender);
119
120
/** Stop the leader election service */
121
void stop();
122
123
/** Confirm leadership by the current leader */
124
void confirmLeadership(UUID leaderSessionID);
125
126
/** Check if contender has leadership */
127
boolean hasLeadership(UUID leaderSessionId);
128
}
129
130
/**
131
* Interface for leader contenders which participate in leader election.
132
*/
133
public interface LeaderContender {
134
/** Grant leadership to this contender */
135
void grantLeadership(UUID leaderSessionID);
136
137
/** Revoke leadership from this contender */
138
void revokeLeadership();
139
140
/** Get leader address when requested */
141
String getDescription();
142
143
/** Handle errors during leadership */
144
void handleError(Exception exception);
145
}
146
```
147
148
### LeaderRetrievalService
149
150
Service for retrieving information about current leaders and being notified of leadership changes.
151
152
```java { .api }
153
/**
154
* Service which retrieves the current leader and notifies a listener about leadership changes.
155
* The leader retrieval service can only be started once.
156
*/
157
public interface LeaderRetrievalService {
158
/** Start the leader retrieval service with a listener */
159
void start(LeaderRetrievalListener listener);
160
161
/** Stop the leader retrieval service */
162
void stop();
163
}
164
165
/**
166
* Listener interface for leader retrieval. The listener is notified
167
* about new leaders and leader changes.
168
*/
169
public interface LeaderRetrievalListener {
170
/** Notify about new leader */
171
void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
172
173
/** Handle retrieval errors */
174
void handleError(Exception exception);
175
}
176
```
177
178
**Usage Examples:**
179
180
```java
181
// Implement leader contender
182
public class ResourceManagerLeaderContender implements LeaderContender {
183
private final ResourceManager resourceManager;
184
private UUID currentLeaderSessionId;
185
186
@Override
187
public void grantLeadership(UUID leaderSessionID) {
188
currentLeaderSessionId = leaderSessionID;
189
resourceManager.becomeLeader(leaderSessionID);
190
// Confirm leadership
191
leaderElectionService.confirmLeadership(leaderSessionID);
192
}
193
194
@Override
195
public void revokeLeadership() {
196
currentLeaderSessionId = null;
197
resourceManager.loseLeadership();
198
}
199
200
@Override
201
public String getDescription() {
202
return resourceManager.getAddress();
203
}
204
205
@Override
206
public void handleError(Exception exception) {
207
resourceManager.handleFatalError(exception);
208
}
209
}
210
211
// Implement leader retrieval listener
212
public class JobManagerLeaderListener implements LeaderRetrievalListener {
213
private final JobMasterGatewayRetriever gatewayRetriever;
214
215
@Override
216
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
217
if (leaderAddress != null && leaderSessionID != null) {
218
JobMasterGateway gateway = gatewayRetriever.getGateway(leaderAddress);
219
// Connect to new JobManager leader
220
connectToJobManager(gateway, leaderSessionID);
221
} else {
222
// Leader lost
223
disconnectFromJobManager();
224
}
225
}
226
227
@Override
228
public void handleError(Exception exception) {
229
// Handle retrieval error
230
logger.error("Error in leader retrieval", exception);
231
}
232
}
233
```
234
235
### ZooKeeper High Availability
236
237
ZooKeeper-based implementation providing distributed coordination and persistence.
238
239
```java { .api }
240
/**
241
* ZooKeeper based implementation of HighAvailabilityServices.
242
*/
243
public class ZooKeeperHaServices implements HighAvailabilityServices {
244
/** Create ZooKeeper HA services */
245
public static ZooKeeperHaServices create(
246
CuratorFramework client,
247
Configuration configuration,
248
Executor executor
249
);
250
251
/** Get ZooKeeper client */
252
public CuratorFramework getClient();
253
254
/** Get cluster configuration store path */
255
public String getClusterConfigurationStorePath();
256
257
/** Get leader path for component */
258
public String getLeaderPath(String componentName);
259
260
/** Create ZooKeeper leader election service */
261
protected LeaderElectionService createLeaderElectionService(String leaderPath);
262
263
/** Create ZooKeeper leader retrieval service */
264
protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath);
265
}
266
267
/**
268
* ZooKeeper based leader election service implementation.
269
*/
270
public class ZooKeeperLeaderElectionService implements LeaderElectionService {
271
/** Create ZooKeeper leader election service */
272
public ZooKeeperLeaderElectionService(
273
CuratorFramework client,
274
String latchPath,
275
String leaderPath
276
);
277
278
/** Start leader election */
279
public void start(LeaderContender contender);
280
281
/** Stop leader election */
282
public void stop();
283
284
/** Confirm leadership */
285
public void confirmLeadership(UUID leaderSessionID);
286
287
/** Check leadership */
288
public boolean hasLeadership(UUID leaderSessionId);
289
290
/** Get leader latch */
291
protected LeaderLatch getLeaderLatch();
292
293
/** Write leader information to ZooKeeper */
294
protected void writeLeaderInformation(UUID leaderSessionID);
295
}
296
297
/**
298
* ZooKeeper based leader retrieval service implementation.
299
*/
300
public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService {
301
/** Create ZooKeeper leader retrieval service */
302
public ZooKeeperLeaderRetrievalService(
303
CuratorFramework client,
304
String retrievalPath
305
);
306
307
/** Start leader retrieval */
308
public void start(LeaderRetrievalListener listener);
309
310
/** Stop leader retrieval */
311
public void stop();
312
313
/** Handle ZooKeeper connection state changes */
314
protected void handleConnectionStateChanged(ConnectionState newState);
315
316
/** Handle leader node changes */
317
protected void handleLeaderChange();
318
}
319
```
320
321
### Embedded High Availability
322
323
Simple embedded HA implementation for testing and single-node deployments.
324
325
```java { .api }
326
/**
327
* An implementation of the HighAvailabilityServices for the non-high-availability case.
328
* This implementation can be used for testing or for cluster setups that do not
329
* require high availability.
330
*/
331
public class EmbeddedHaServices implements HighAvailabilityServices {
332
/** Create embedded HA services */
333
public EmbeddedHaServices(Executor executor);
334
335
/** Get ResourceManager leader election service */
336
public LeaderElectionService getResourceManagerLeaderElectionService();
337
338
/** Get Dispatcher leader election service */
339
public LeaderElectionService getDispatcherLeaderElectionService();
340
341
/** Get JobManager leader election service */
342
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
343
344
/** Get checkpoint recovery factory */
345
public CheckpointRecoveryFactory getCheckpointRecoveryFactory();
346
347
/** Get job graph store */
348
public JobGraphStore getJobGraphStore();
349
350
/** Get job result store */
351
public JobResultStore getJobResultStore();
352
353
/** Get running jobs registry */
354
public RunningJobsRegistry getRunningJobsRegistry();
355
356
/** Create blob store service */
357
public BlobStoreService createBlobStore();
358
359
/** Close services */
360
public CompletableFuture<Void> closeAsync();
361
}
362
363
/**
364
* Embedded leader election service that immediately grants leadership.
365
*/
366
public class EmbeddedLeaderElectionService implements LeaderElectionService {
367
/** Start with automatic leadership grant */
368
public void start(LeaderContender contender);
369
370
/** Stop the service */
371
public void stop();
372
373
/** Confirm leadership (always true for embedded) */
374
public void confirmLeadership(UUID leaderSessionID);
375
376
/** Check leadership (always true for embedded) */
377
public boolean hasLeadership(UUID leaderSessionId);
378
}
379
```
380
381
### Job Graph Store
382
383
Persistent storage for job graphs enabling recovery after failures.
384
385
```java { .api }
386
/**
387
* JobGraphStore interface for persisting and retrieving job graphs in a highly available manner.
388
*/
389
public interface JobGraphStore {
390
/** Start the job graph store */
391
void start(JobGraphListener jobGraphListener);
392
393
/** Stop the job graph store */
394
void stop();
395
396
/** Put job graph into store */
397
void putJobGraph(JobGraph jobGraph);
398
399
/** Remove job graph from store */
400
void removeJobGraph(JobID jobId);
401
402
/** Release locks for job graph */
403
void releaseJobGraph(JobID jobId);
404
405
/** Get all job IDs */
406
Collection<JobID> getJobIds();
407
408
/** Get stored job graphs */
409
Collection<JobGraph> recoverJobGraphs();
410
}
411
412
/**
413
* Listener for job graph store events.
414
*/
415
public interface JobGraphListener {
416
/** Called when job graph is added */
417
void onAddedJobGraph(JobID jobId);
418
419
/** Called when job graph is removed */
420
void onRemovedJobGraph(JobID jobId);
421
}
422
```
423
424
**Usage Examples:**
425
426
```java
427
// Configure high availability mode
428
Configuration config = new Configuration();
429
430
// ZooKeeper HA setup
431
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
432
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181");
433
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "production-cluster");
434
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "hdfs://cluster/flink-ha");
435
436
// Or embedded HA for testing
437
// config.setString(HighAvailabilityOptions.HA_MODE, "NONE");
438
439
// Create HA services
440
HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
441
config,
442
ioExecutor,
443
AddressResolution.TRY_ADDRESS_RESOLUTION,
444
rpcSystem,
445
fatalErrorHandler
446
);
447
448
// Use job graph store for persistence
449
JobGraphStore jobGraphStore = haServices.getJobGraphStore();
450
jobGraphStore.start(new JobGraphListener() {
451
@Override
452
public void onAddedJobGraph(JobID jobId) {
453
System.out.println("Job graph added: " + jobId);
454
}
455
456
@Override
457
public void onRemovedJobGraph(JobID jobId) {
458
System.out.println("Job graph removed: " + jobId);
459
}
460
});
461
462
// Store job graph
463
jobGraphStore.putJobGraph(jobGraph);
464
465
// Recover job graphs after restart
466
Collection<JobGraph> recoveredJobs = jobGraphStore.recoverJobGraphs();
467
for (JobGraph job : recoveredJobs) {
468
System.out.println("Recovered job: " + job.getJobID());
469
}
470
```
471
472
## Types
473
474
```java { .api }
475
// High availability modes
476
public enum HighAvailabilityMode {
477
NONE("NONE"),
478
ZOOKEEPER("zookeeper"),
479
KUBERNETES("kubernetes");
480
481
private final String value;
482
483
public String getValue();
484
public static HighAvailabilityMode fromConfig(Configuration config);
485
}
486
487
// Leadership session identifiers
488
public class UUID implements Serializable, Comparable<UUID> {
489
public static UUID randomUUID();
490
public static UUID fromString(String name);
491
492
public long getMostSignificantBits();
493
public long getLeastSignificantBits();
494
public String toString();
495
}
496
497
// Running jobs registry
498
public interface RunningJobsRegistry {
499
/** Set job running */
500
void setJobRunning(JobID jobID);
501
502
/** Set job finished */
503
void setJobFinished(JobID jobID);
504
505
/** Get job scheduling status */
506
JobSchedulingStatus getJobSchedulingStatus(JobID jobID);
507
508
/** Get running job IDs */
509
Set<JobID> getRunningJobIds();
510
511
/** Clear job from registry */
512
void clearJob(JobID jobID);
513
}
514
515
public enum JobSchedulingStatus {
516
PENDING,
517
RUNNING,
518
DONE
519
}
520
521
// Checkpoint recovery components
522
public interface CheckpointRecoveryFactory {
523
/** Create completed checkpoint store */
524
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
525
JobID jobId,
526
int maxNumberOfCheckpointsToRetain,
527
SharedStateRegistryFactory sharedStateRegistryFactory,
528
Executor ioExecutor
529
);
530
531
/** Create checkpoint ID counter */
532
CheckpointIDCounter createCheckpointIDCounter(JobID jobId);
533
}
534
535
// Configuration options
536
public class HighAvailabilityOptions {
537
public static final ConfigOption<String> HA_MODE;
538
public static final ConfigOption<String> HA_CLUSTER_ID;
539
public static final ConfigOption<String> HA_STORAGE_PATH;
540
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM;
541
public static final ConfigOption<String> HA_ZOOKEEPER_ROOT;
542
public static final ConfigOption<Integer> HA_ZOOKEEPER_SESSION_TIMEOUT;
543
public static final ConfigOption<Integer> HA_ZOOKEEPER_CONNECTION_TIMEOUT;
544
public static final ConfigOption<Integer> HA_ZOOKEEPER_RETRY_WAIT;
545
public static final ConfigOption<Integer> HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS;
546
public static final ConfigOption<String> HA_ZOOKEEPER_NAMESPACE;
547
}
548
549
// Blob store service for distributed file storage
550
public interface BlobStoreService extends Closeable {
551
/** Put blob in store */
552
boolean put(File localFile, BlobKey blobKey);
553
554
/** Get blob from store */
555
boolean get(BlobKey blobKey, File localFile);
556
557
/** Delete blob from store */
558
boolean delete(BlobKey blobKey);
559
560
/** Delete all blobs for job */
561
boolean deleteAll(JobID jobId);
562
563
/** Close the blob store */
564
void close();
565
}
566
```