0
# High Availability Services
1
2
The High Availability Services provide cluster coordination and fault tolerance infrastructure for Flink clusters. These services enable leader election, distributed storage coordination, and recovery mechanisms that ensure cluster resilience and continuous operation in the face of node failures.
3
4
## Core Services Interface
5
6
### HighAvailabilityServices
7
8
The primary interface that provides access to all high availability services required by a Flink cluster.
9
10
```java { .api }
11
public interface HighAvailabilityServices extends AutoCloseable {
12
LeaderRetrievalService getResourceManagerLeaderRetriever();
13
LeaderRetrievalService getDispatcherLeaderRetriever();
14
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
15
LeaderRetrievalService getWebMonitorLeaderRetriever();
16
17
LeaderElectionService getResourceManagerLeaderElectionService();
18
LeaderElectionService getDispatcherLeaderElectionService();
19
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
20
LeaderElectionService getWebMonitorLeaderElectionService();
21
22
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
23
JobGraphStore getJobGraphStore();
24
JobResultStore getJobResultStore();
25
26
RunningJobsRegistry getRunningJobsRegistry();
27
BlobStore createBlobStore() throws IOException;
28
29
@Override
30
void close() throws Exception;
31
32
void closeAndCleanupAllData() throws Exception;
33
}
34
```
35
36
## Leader Election Services
37
38
### LeaderElectionService
39
40
Service for participating in leader election processes within the cluster.
41
42
```java { .api }
43
public interface LeaderElectionService {
44
void start(LeaderContender contender) throws Exception;
45
void stop() throws Exception;
46
47
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
48
boolean hasLeadership(UUID leaderSessionId);
49
}
50
```
51
52
### LeaderContender
53
54
Interface implemented by components that want to participate in leader election.
55
56
```java { .api }
57
public interface LeaderContender {
58
void grantLeadership(UUID leaderSessionID);
59
void revokeLeadership();
60
String getAddress();
61
void handleError(Exception exception);
62
}
63
```
64
65
### LeaderRetrievalService
66
67
Service for retrieving current leader information and receiving leadership change notifications.
68
69
```java { .api }
70
public interface LeaderRetrievalService {
71
void start(LeaderRetrievalListener listener) throws Exception;
72
void stop() throws Exception;
73
}
74
```
75
76
### LeaderRetrievalListener
77
78
Listener interface for receiving leader change notifications.
79
80
```java { .api }
81
public interface LeaderRetrievalListener {
82
void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
83
void handleError(Exception exception);
84
}
85
```
86
87
## Storage and Persistence Services
88
89
### CheckpointRecoveryFactory
90
91
Factory for creating checkpoint recovery services that handle checkpoint metadata persistence.
92
93
```java { .api }
94
public interface CheckpointRecoveryFactory {
95
CompletedCheckpointStore createCompletedCheckpointStore(
96
JobID jobId,
97
int maxNumberOfCheckpointsToRetain,
98
ClassLoader userClassLoader
99
) throws Exception;
100
101
CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
102
}
103
```
104
105
### JobGraphStore
106
107
Persistent storage for job graphs to enable job recovery after failures.
108
109
```java { .api }
110
public interface JobGraphStore {
111
void putJobGraph(StoredJobGraph jobGraph) throws Exception;
112
StoredJobGraph recoverJobGraph(JobID jobId) throws Exception;
113
void removeJobGraph(JobID jobId) throws Exception;
114
115
Collection<JobID> getJobIds() throws Exception;
116
void start(JobGraphListener jobGraphListener) throws Exception;
117
void stop() throws Exception;
118
}
119
```
120
121
### JobResultStore
122
123
Storage for persisting job execution results and status information.
124
125
```java { .api }
126
public interface JobResultStore {
127
void createDirtyResult(JobResultEntry jobResultEntry) throws IOException;
128
void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException;
129
130
boolean hasJobResultEntry(JobID jobId) throws IOException;
131
boolean hasDirtyJobResultEntry(JobID jobId) throws IOException;
132
boolean hasCleanJobResultEntry(JobID jobId) throws IOException;
133
134
Set<JobResult> getDirtyResults() throws IOException;
135
Set<JobResult> getCleanResults() throws IOException;
136
}
137
```
138
139
## Registry Services
140
141
### RunningJobsRegistry
142
143
Registry for tracking which jobs are currently running in the cluster.
144
145
```java { .api }
146
public interface RunningJobsRegistry {
147
void setJobRunning(JobID jobID) throws IOException;
148
void setJobFinished(JobID jobID) throws IOException;
149
150
JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;
151
152
enum JobSchedulingStatus {
153
PENDING,
154
RUNNING,
155
DONE
156
}
157
}
158
```
159
160
### BlobStore
161
162
Distributed storage service for binary large objects (BLOBs) like JAR files and large state.
163
164
```java { .api }
165
public interface BlobStore extends Closeable {
166
boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
167
boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
168
boolean delete(JobID jobId, BlobKey blobKey);
169
boolean deleteAll(JobID jobId);
170
171
void closeAndCleanupAllData() throws IOException;
172
}
173
```
174
175
## Utility Classes
176
177
### HighAvailabilityServicesUtils
178
179
Utility class providing factory methods and helper functions for HA services.
180
181
```java { .api }
182
public class HighAvailabilityServicesUtils {
183
public static HighAvailabilityServices createAvailableOrEmbeddedServices(
184
Configuration config,
185
Executor executor
186
) throws Exception;
187
188
public static HighAvailabilityServices createHighAvailabilityServices(
189
Configuration configuration,
190
Executor executor,
191
AddressResolution addressResolution
192
) throws Exception;
193
194
public static String getJobManagerAddress(Configuration config) throws Exception;
195
196
public static LeaderRetrievalService createLeaderRetrievalService(
197
Configuration config,
198
String serviceName
199
) throws Exception;
200
201
public static LeaderElectionService createLeaderElectionService(
202
Configuration config,
203
String serviceName
204
) throws Exception;
205
206
public static void setJobManagerAddress(Configuration config, String address, int port);
207
}
208
```
209
210
## Configuration Options
211
212
### High Availability Configuration Keys
213
214
Configuration options for setting up high availability services.
215
216
```java { .api }
217
public class HighAvailabilityOptions {
218
public static final ConfigOption<String> HA_MODE =
219
key("high-availability").defaultValue("NONE");
220
221
public static final ConfigOption<String> HA_CLUSTER_ID =
222
key("high-availability.cluster-id").defaultValue("default");
223
224
public static final ConfigOption<String> HA_STORAGE_PATH =
225
key("high-availability.storageDir").noDefaultValue();
226
227
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
228
key("high-availability.zookeeper.quorum").noDefaultValue();
229
230
public static final ConfigOption<Integer> HA_ZOOKEEPER_SESSION_TIMEOUT =
231
key("high-availability.zookeeper.client.session-timeout").defaultValue(60000);
232
233
public static final ConfigOption<Integer> HA_ZOOKEEPER_CONNECTION_TIMEOUT =
234
key("high-availability.zookeeper.client.connection-timeout").defaultValue(15000);
235
236
public static final ConfigOption<Integer> HA_ZOOKEEPER_RETRY_WAIT =
237
key("high-availability.zookeeper.client.retry-wait").defaultValue(5000);
238
239
public static final ConfigOption<Integer> HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS =
240
key("high-availability.zookeeper.client.max-retry-attempts").defaultValue(3);
241
242
public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
243
key("high-availability.zookeeper.path.root").defaultValue("/flink");
244
}
245
```
246
247
## Exception Handling
248
249
### HighAvailabilityServicesException
250
251
Base exception for high availability service failures.
252
253
```java { .api }
254
public class HighAvailabilityServicesException extends FlinkException {
255
public HighAvailabilityServicesException(String message);
256
public HighAvailabilityServicesException(String message, Throwable cause);
257
}
258
```
259
260
### LeaderElectionException
261
262
Exception thrown during leader election process failures.
263
264
```java { .api }
265
public class LeaderElectionException extends FlinkException {
266
public LeaderElectionException(String message);
267
public LeaderElectionException(String message, Throwable cause);
268
}
269
```
270
271
## Usage Examples
272
273
### Setting Up High Availability with ZooKeeper
274
275
```java
276
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
277
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
278
import org.apache.flink.configuration.Configuration;
279
import org.apache.flink.configuration.HighAvailabilityOptions;
280
281
// Configure ZooKeeper-based high availability
282
Configuration config = new Configuration();
283
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
284
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "localhost:2181");
285
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///ha-storage");
286
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "my-flink-cluster");
287
288
// Create HA services
289
HighAvailabilityServices haServices = HighAvailabilityServicesUtils
290
.createHighAvailabilityServices(config, executor, AddressResolution.TRY_ADDRESS_RESOLUTION);
291
292
try {
293
// Use HA services for cluster coordination
294
LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
295
LeaderRetrievalService rmLeaderRetrieval = haServices.getResourceManagerLeaderRetriever();
296
297
// Set up leader election for Resource Manager
298
ResourceManagerLeaderContender rmContender = new ResourceManagerLeaderContender();
299
rmLeaderElection.start(rmContender);
300
301
// Set up leader retrieval for clients
302
ResourceManagerLeaderListener rmListener = new ResourceManagerLeaderListener();
303
rmLeaderRetrieval.start(rmListener);
304
305
} finally {
306
haServices.close();
307
}
308
```
309
310
### Implementing a Leader Contender
311
312
```java
313
import org.apache.flink.runtime.highavailability.LeaderContender;
314
import java.util.UUID;
315
316
public class ResourceManagerLeaderContender implements LeaderContender {
317
private volatile boolean isLeader = false;
318
private volatile UUID currentLeaderSessionId;
319
private final String address;
320
321
public ResourceManagerLeaderContender(String address) {
322
this.address = address;
323
}
324
325
@Override
326
public void grantLeadership(UUID leaderSessionID) {
327
synchronized (this) {
328
if (!isLeader) {
329
System.out.println("Granted leadership with session ID: " + leaderSessionID);
330
this.currentLeaderSessionId = leaderSessionID;
331
this.isLeader = true;
332
333
// Confirm leadership and start serving as leader
334
confirmLeadership(leaderSessionID);
335
startLeaderServices();
336
}
337
}
338
}
339
340
@Override
341
public void revokeLeadership() {
342
synchronized (this) {
343
if (isLeader) {
344
System.out.println("Leadership revoked");
345
this.isLeader = false;
346
this.currentLeaderSessionId = null;
347
348
// Stop leader services
349
stopLeaderServices();
350
}
351
}
352
}
353
354
@Override
355
public String getAddress() {
356
return address;
357
}
358
359
@Override
360
public void handleError(Exception exception) {
361
System.err.println("Leader election error: " + exception.getMessage());
362
// Handle leadership errors - may need to restart election
363
revokeLeadership();
364
}
365
366
private void confirmLeadership(UUID leaderSessionID) {
367
// Confirm leadership with the election service
368
leaderElectionService.confirmLeadership(leaderSessionID, address);
369
}
370
371
private void startLeaderServices() {
372
// Initialize services that only the leader should run
373
System.out.println("Starting Resource Manager leader services");
374
}
375
376
private void stopLeaderServices() {
377
// Clean up leader-only services
378
System.out.println("Stopping Resource Manager leader services");
379
}
380
381
public boolean hasLeadership() {
382
return isLeader;
383
}
384
385
public UUID getCurrentLeaderSessionId() {
386
return currentLeaderSessionId;
387
}
388
}
389
```
390
391
### Implementing a Leader Retrieval Listener
392
393
```java
394
import org.apache.flink.runtime.highavailability.LeaderRetrievalListener;
395
import java.util.UUID;
396
397
public class ResourceManagerLeaderListener implements LeaderRetrievalListener {
398
private volatile String currentLeaderAddress;
399
private volatile UUID currentLeaderSessionId;
400
401
@Override
402
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
403
synchronized (this) {
404
if (!Objects.equals(currentLeaderAddress, leaderAddress) ||
405
!Objects.equals(currentLeaderSessionId, leaderSessionID)) {
406
407
System.out.println("New Resource Manager leader: " + leaderAddress +
408
" (session: " + leaderSessionID + ")");
409
410
// Update connection to new leader
411
updateLeaderConnection(leaderAddress, leaderSessionID);
412
413
this.currentLeaderAddress = leaderAddress;
414
this.currentLeaderSessionId = leaderSessionID;
415
}
416
}
417
}
418
419
@Override
420
public void handleError(Exception exception) {
421
System.err.println("Leader retrieval error: " + exception.getMessage());
422
423
// Clear current leader information
424
synchronized (this) {
425
this.currentLeaderAddress = null;
426
this.currentLeaderSessionId = null;
427
disconnectFromLeader();
428
}
429
}
430
431
private void updateLeaderConnection(String leaderAddress, UUID leaderSessionId) {
432
// Establish connection to the new leader
433
if (leaderAddress != null) {
434
System.out.println("Connecting to Resource Manager at: " + leaderAddress);
435
// ... connect to leader
436
} else {
437
System.out.println("No Resource Manager leader available");
438
disconnectFromLeader();
439
}
440
}
441
442
private void disconnectFromLeader() {
443
// Clean up connections to previous leader
444
System.out.println("Disconnecting from Resource Manager leader");
445
}
446
447
public String getCurrentLeaderAddress() {
448
return currentLeaderAddress;
449
}
450
451
public UUID getCurrentLeaderSessionId() {
452
return currentLeaderSessionId;
453
}
454
}
455
```
456
457
### Job Graph Store Implementation
458
459
```java
460
import org.apache.flink.runtime.highavailability.JobGraphStore;
461
import org.apache.flink.api.common.JobID;
462
463
public class FileSystemJobGraphStore implements JobGraphStore {
464
private final Path storageDirectory;
465
private volatile JobGraphListener listener;
466
467
public FileSystemJobGraphStore(Path storageDirectory) {
468
this.storageDirectory = storageDirectory;
469
}
470
471
@Override
472
public void putJobGraph(StoredJobGraph jobGraph) throws Exception {
473
JobID jobId = jobGraph.getJobId();
474
Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
475
476
// Serialize and store job graph
477
try (ObjectOutputStream oos = new ObjectOutputStream(
478
Files.newOutputStream(jobFile))) {
479
oos.writeObject(jobGraph);
480
}
481
482
System.out.println("Stored job graph for: " + jobId);
483
484
// Notify listener
485
if (listener != null) {
486
listener.onAddedJobGraph(jobId);
487
}
488
}
489
490
@Override
491
public StoredJobGraph recoverJobGraph(JobID jobId) throws Exception {
492
Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
493
494
if (!Files.exists(jobFile)) {
495
throw new Exception("Job graph not found: " + jobId);
496
}
497
498
// Deserialize job graph
499
try (ObjectInputStream ois = new ObjectInputStream(
500
Files.newInputStream(jobFile))) {
501
return (StoredJobGraph) ois.readObject();
502
}
503
}
504
505
@Override
506
public void removeJobGraph(JobID jobId) throws Exception {
507
Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");
508
Files.deleteIfExists(jobFile);
509
510
System.out.println("Removed job graph for: " + jobId);
511
512
// Notify listener
513
if (listener != null) {
514
listener.onRemovedJobGraph(jobId);
515
}
516
}
517
518
@Override
519
public Collection<JobID> getJobIds() throws Exception {
520
if (!Files.exists(storageDirectory)) {
521
return Collections.emptyList();
522
}
523
524
List<JobID> jobIds = new ArrayList<>();
525
try (DirectoryStream<Path> stream = Files.newDirectoryStream(
526
storageDirectory, "*.job")) {
527
528
for (Path path : stream) {
529
String filename = path.getFileName().toString();
530
String jobIdStr = filename.substring(0, filename.lastIndexOf(".job"));
531
jobIds.add(JobID.fromHexString(jobIdStr));
532
}
533
}
534
535
return jobIds;
536
}
537
538
@Override
539
public void start(JobGraphListener jobGraphListener) throws Exception {
540
this.listener = jobGraphListener;
541
542
// Ensure storage directory exists
543
Files.createDirectories(storageDirectory);
544
545
System.out.println("Started FileSystem job graph store at: " + storageDirectory);
546
}
547
548
@Override
549
public void stop() throws Exception {
550
this.listener = null;
551
System.out.println("Stopped FileSystem job graph store");
552
}
553
}
554
```
555
556
### Checkpoint Recovery Factory
557
558
```java
559
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
560
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
561
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
562
563
public class FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
564
private final Path checkpointDirectory;
565
private final Configuration configuration;
566
567
public FileSystemCheckpointRecoveryFactory(Path checkpointDirectory, Configuration configuration) {
568
this.checkpointDirectory = checkpointDirectory;
569
this.configuration = configuration;
570
}
571
572
@Override
573
public CompletedCheckpointStore createCompletedCheckpointStore(
574
JobID jobId,
575
int maxNumberOfCheckpointsToRetain,
576
ClassLoader userClassLoader) throws Exception {
577
578
Path jobCheckpointDir = checkpointDirectory.resolve(jobId.toString());
579
580
return new FileSystemCompletedCheckpointStore(
581
jobCheckpointDir,
582
maxNumberOfCheckpointsToRetain,
583
userClassLoader
584
);
585
}
586
587
@Override
588
public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {
589
Path counterFile = checkpointDirectory.resolve(jobId.toString()).resolve("counter");
590
591
return new FileSystemCheckpointIDCounter(counterFile);
592
}
593
}
594
```
595
596
## Common Patterns
597
598
### HA Services Lifecycle Management
599
600
```java
601
public class ClusterManager {
602
private HighAvailabilityServices haServices;
603
private final Configuration configuration;
604
605
public ClusterManager(Configuration configuration) {
606
this.configuration = configuration;
607
}
608
609
public void start() throws Exception {
610
// Initialize HA services
611
haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
612
configuration,
613
ForkJoinPool.commonPool(),
614
AddressResolution.TRY_ADDRESS_RESOLUTION
615
);
616
617
// Start cluster components with HA support
618
startResourceManager();
619
startDispatcher();
620
startWebMonitor();
621
}
622
623
public void stop() throws Exception {
624
try {
625
// Stop cluster components first
626
stopWebMonitor();
627
stopDispatcher();
628
stopResourceManager();
629
} finally {
630
// Always cleanup HA services
631
if (haServices != null) {
632
haServices.closeAndCleanupAllData();
633
}
634
}
635
}
636
637
private void startResourceManager() throws Exception {
638
LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();
639
ResourceManagerLeaderContender contender = new ResourceManagerLeaderContender();
640
rmLeaderElection.start(contender);
641
}
642
643
// ... other component management methods
644
}
645
```
646
647
### Robust Error Handling
648
649
```java
650
public class HARobustComponent implements LeaderContender, LeaderRetrievalListener {
651
private final ScheduledExecutorService scheduler;
652
private final AtomicBoolean isRunning = new AtomicBoolean(false);
653
654
@Override
655
public void handleError(Exception exception) {
656
System.err.println("HA error occurred: " + exception.getMessage());
657
658
// Implement exponential backoff retry
659
scheduler.schedule(() -> {
660
if (isRunning.get()) {
661
try {
662
restartHAServices();
663
} catch (Exception e) {
664
System.err.println("Failed to restart HA services: " + e.getMessage());
665
handleError(e); // Recursive retry with backoff
666
}
667
}
668
}, calculateBackoffDelay(), TimeUnit.MILLISECONDS);
669
}
670
671
private void restartHAServices() throws Exception {
672
// Restart HA service connections
673
System.out.println("Restarting HA services");
674
// ... restart logic
675
}
676
677
private long calculateBackoffDelay() {
678
// Implement exponential backoff
679
return Math.min(1000 * (long) Math.pow(2, retryCount), 30000);
680
}
681
}
682
```