or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

execution-scheduling.mdhigh-availability.mdindex.mdjob-graph.mdresource-management.mdstate-checkpointing.mdtask-execution.md

high-availability.mddocs/

0

# High Availability and Coordination

1

2

Leader election, service discovery, and coordination services for fault-tolerant distributed operation. Flink's high availability system ensures cluster resilience and automatic recovery from failures.

3

4

## Capabilities

5

6

### HighAvailabilityServices

7

8

Central service providing high availability components for leader election, service discovery, and coordination.

9

10

```java { .api }

11

/**

12

* The HighAvailabilityServices provide access to all services needed for a highly-available

13

* setup. In particular, they provide access to highly-available variants of the following services:

14

* - ResourceManager leader election and leader retrieval

15

* - Dispatcher leader election and leader retrieval

16

* - JobManager leader election and leader retrieval

17

* - Checkpointing metadata persistence

18

*/

19

public interface HighAvailabilityServices extends AutoCloseableAsync {

20

/** Get ResourceManager leader election service */

21

LeaderElectionService getResourceManagerLeaderElectionService();

22

23

/** Get Dispatcher leader election service */

24

LeaderElectionService getDispatcherLeaderElectionService();

25

26

/** Get JobManager leader election service for specific job */

27

LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

28

29

/** Get ResourceManager leader retrieval service */

30

LeaderRetrievalService getResourceManagerLeaderRetriever();

31

32

/** Get Dispatcher leader retrieval service */

33

LeaderRetrievalService getDispatcherLeaderRetriever();

34

35

/** Get JobManager leader retrieval service for specific job */

36

LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

37

38

/** Get JobManager leader retrieval service with fallback */

39

LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

40

41

/** Get web monitor leader retrieval service */

42

LeaderRetrievalService getWebMonitorLeaderRetriever();

43

44

/** Get cluster rest endpoint leader retrieval service */

45

LeaderRetrievalService getClusterRestEndpointLeaderRetriever();

46

47

/** Get checkpoint recovery factory */

48

CheckpointRecoveryFactory getCheckpointRecoveryFactory();

49

50

/** Get job graph store */

51

JobGraphStore getJobGraphStore();

52

53

/** Get job result store */

54

JobResultStore getJobResultStore();

55

56

/** Get running jobs registry */

57

RunningJobsRegistry getRunningJobsRegistry();

58

59

/** Get blob store service */

60

BlobStoreService createBlobStore();

61

62

/** Close services asynchronously */

63

CompletableFuture<Void> closeAsync();

64

65

/** Close and cleanup all HA data */

66

CompletableFuture<Void> closeAndCleanupAllData();

67

}

68

```

69

70

**Usage Examples:**

71

72

```java

73

// Create HA services from configuration

74

Configuration config = new Configuration();

75

config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

76

config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181,zk3:2181");

77

config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "my-flink-cluster");

78

79

HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(

80

config,

81

ioExecutor,

82

AddressResolution.TRY_ADDRESS_RESOLUTION,

83

rpcSystem,

84

fatalErrorHandler

85

);

86

87

// Get ResourceManager leader election service

88

LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();

89

rmLeaderElection.start(new ResourceManagerLeaderContender());

90

91

// Get JobManager leader retrieval for monitoring

92

LeaderRetrievalService jmLeaderRetrieval = haServices.getJobManagerLeaderRetriever(jobId);

93

jmLeaderRetrieval.start(new JobManagerLeaderListener());

94

95

// Get checkpoint recovery factory

96

CheckpointRecoveryFactory checkpointRecovery = haServices.getCheckpointRecoveryFactory();

97

CompletedCheckpointStore checkpointStore = checkpointRecovery.createRecoveredCompletedCheckpointStore(

98

jobId,

99

maxNumberOfCheckpointsToRetain,

100

sharedStateRegistryFactory,

101

ioExecutor

102

);

103

```

104

105

### LeaderElectionService

106

107

Service for conducting leader elections among multiple candidates, ensuring single leadership.

108

109

```java { .api }

110

/**

111

* Interface for a service which allows to elect a leader among a group of contenders.

112

* Prior to using this service, it has to be started by calling the start method.

113

* The start method takes the contender as an argument. If there are multiple contenders,

114

* then each one has to call the start method with its own contender object.

115

*/

116

public interface LeaderElectionService {

117

/** Start the leader election service with a contender */

118

void start(LeaderContender contender);

119

120

/** Stop the leader election service */

121

void stop();

122

123

/** Confirm leadership by the current leader */

124

void confirmLeadership(UUID leaderSessionID);

125

126

/** Check if contender has leadership */

127

boolean hasLeadership(UUID leaderSessionId);

128

}

129

130

/**

131

* Interface for leader contenders which participate in leader election.

132

*/

133

public interface LeaderContender {

134

/** Grant leadership to this contender */

135

void grantLeadership(UUID leaderSessionID);

136

137

/** Revoke leadership from this contender */

138

void revokeLeadership();

139

140

/** Get leader address when requested */

141

String getDescription();

142

143

/** Handle errors during leadership */

144

void handleError(Exception exception);

145

}

146

```

147

148

### LeaderRetrievalService

149

150

Service for retrieving information about current leaders and being notified of leadership changes.

151

152

```java { .api }

153

/**

154

* Service which retrieves the current leader and notifies a listener about leadership changes.

155

* The leader retrieval service can only be started once.

156

*/

157

public interface LeaderRetrievalService {

158

/** Start the leader retrieval service with a listener */

159

void start(LeaderRetrievalListener listener);

160

161

/** Stop the leader retrieval service */

162

void stop();

163

}

164

165

/**

166

* Listener interface for leader retrieval. The listener is notified

167

* about new leaders and leader changes.

168

*/

169

public interface LeaderRetrievalListener {

170

/** Notify about new leader */

171

void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);

172

173

/** Handle retrieval errors */

174

void handleError(Exception exception);

175

}

176

```

177

178

**Usage Examples:**

179

180

```java

181

// Implement leader contender

182

public class ResourceManagerLeaderContender implements LeaderContender {

183

private final ResourceManager resourceManager;

184

private UUID currentLeaderSessionId;

185

186

@Override

187

public void grantLeadership(UUID leaderSessionID) {

188

currentLeaderSessionId = leaderSessionID;

189

resourceManager.becomeLeader(leaderSessionID);

190

// Confirm leadership

191

leaderElectionService.confirmLeadership(leaderSessionID);

192

}

193

194

@Override

195

public void revokeLeadership() {

196

currentLeaderSessionId = null;

197

resourceManager.loseLeadership();

198

}

199

200

@Override

201

public String getDescription() {

202

return resourceManager.getAddress();

203

}

204

205

@Override

206

public void handleError(Exception exception) {

207

resourceManager.handleFatalError(exception);

208

}

209

}

210

211

// Implement leader retrieval listener

212

public class JobManagerLeaderListener implements LeaderRetrievalListener {

213

private final JobMasterGatewayRetriever gatewayRetriever;

214

215

@Override

216

public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {

217

if (leaderAddress != null && leaderSessionID != null) {

218

JobMasterGateway gateway = gatewayRetriever.getGateway(leaderAddress);

219

// Connect to new JobManager leader

220

connectToJobManager(gateway, leaderSessionID);

221

} else {

222

// Leader lost

223

disconnectFromJobManager();

224

}

225

}

226

227

@Override

228

public void handleError(Exception exception) {

229

// Handle retrieval error

230

logger.error("Error in leader retrieval", exception);

231

}

232

}

233

```

234

235

### ZooKeeper High Availability

236

237

ZooKeeper-based implementation providing distributed coordination and persistence.

238

239

```java { .api }

240

/**

241

* ZooKeeper based implementation of HighAvailabilityServices.

242

*/

243

public class ZooKeeperHaServices implements HighAvailabilityServices {

244

/** Create ZooKeeper HA services */

245

public static ZooKeeperHaServices create(

246

CuratorFramework client,

247

Configuration configuration,

248

Executor executor

249

);

250

251

/** Get ZooKeeper client */

252

public CuratorFramework getClient();

253

254

/** Get cluster configuration store path */

255

public String getClusterConfigurationStorePath();

256

257

/** Get leader path for component */

258

public String getLeaderPath(String componentName);

259

260

/** Create ZooKeeper leader election service */

261

protected LeaderElectionService createLeaderElectionService(String leaderPath);

262

263

/** Create ZooKeeper leader retrieval service */

264

protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath);

265

}

266

267

/**

268

* ZooKeeper based leader election service implementation.

269

*/

270

public class ZooKeeperLeaderElectionService implements LeaderElectionService {

271

/** Create ZooKeeper leader election service */

272

public ZooKeeperLeaderElectionService(

273

CuratorFramework client,

274

String latchPath,

275

String leaderPath

276

);

277

278

/** Start leader election */

279

public void start(LeaderContender contender);

280

281

/** Stop leader election */

282

public void stop();

283

284

/** Confirm leadership */

285

public void confirmLeadership(UUID leaderSessionID);

286

287

/** Check leadership */

288

public boolean hasLeadership(UUID leaderSessionId);

289

290

/** Get leader latch */

291

protected LeaderLatch getLeaderLatch();

292

293

/** Write leader information to ZooKeeper */

294

protected void writeLeaderInformation(UUID leaderSessionID);

295

}

296

297

/**

298

* ZooKeeper based leader retrieval service implementation.

299

*/

300

public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService {

301

/** Create ZooKeeper leader retrieval service */

302

public ZooKeeperLeaderRetrievalService(

303

CuratorFramework client,

304

String retrievalPath

305

);

306

307

/** Start leader retrieval */

308

public void start(LeaderRetrievalListener listener);

309

310

/** Stop leader retrieval */

311

public void stop();

312

313

/** Handle ZooKeeper connection state changes */

314

protected void handleConnectionStateChanged(ConnectionState newState);

315

316

/** Handle leader node changes */

317

protected void handleLeaderChange();

318

}

319

```

320

321

### Embedded High Availability

322

323

Simple embedded HA implementation for testing and single-node deployments.

324

325

```java { .api }

326

/**

327

* An implementation of the HighAvailabilityServices for the non-high-availability case.

328

* This implementation can be used for testing or for cluster setups that do not

329

* require high availability.

330

*/

331

public class EmbeddedHaServices implements HighAvailabilityServices {

332

/** Create embedded HA services */

333

public EmbeddedHaServices(Executor executor);

334

335

/** Get ResourceManager leader election service */

336

public LeaderElectionService getResourceManagerLeaderElectionService();

337

338

/** Get Dispatcher leader election service */

339

public LeaderElectionService getDispatcherLeaderElectionService();

340

341

/** Get JobManager leader election service */

342

public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

343

344

/** Get checkpoint recovery factory */

345

public CheckpointRecoveryFactory getCheckpointRecoveryFactory();

346

347

/** Get job graph store */

348

public JobGraphStore getJobGraphStore();

349

350

/** Get job result store */

351

public JobResultStore getJobResultStore();

352

353

/** Get running jobs registry */

354

public RunningJobsRegistry getRunningJobsRegistry();

355

356

/** Create blob store service */

357

public BlobStoreService createBlobStore();

358

359

/** Close services */

360

public CompletableFuture<Void> closeAsync();

361

}

362

363

/**

364

* Embedded leader election service that immediately grants leadership.

365

*/

366

public class EmbeddedLeaderElectionService implements LeaderElectionService {

367

/** Start with automatic leadership grant */

368

public void start(LeaderContender contender);

369

370

/** Stop the service */

371

public void stop();

372

373

/** Confirm leadership (always true for embedded) */

374

public void confirmLeadership(UUID leaderSessionID);

375

376

/** Check leadership (always true for embedded) */

377

public boolean hasLeadership(UUID leaderSessionId);

378

}

379

```

380

381

### Job Graph Store

382

383

Persistent storage for job graphs enabling recovery after failures.

384

385

```java { .api }

386

/**

387

* JobGraphStore interface for persisting and retrieving job graphs in a highly available manner.

388

*/

389

public interface JobGraphStore {

390

/** Start the job graph store */

391

void start(JobGraphListener jobGraphListener);

392

393

/** Stop the job graph store */

394

void stop();

395

396

/** Put job graph into store */

397

void putJobGraph(JobGraph jobGraph);

398

399

/** Remove job graph from store */

400

void removeJobGraph(JobID jobId);

401

402

/** Release locks for job graph */

403

void releaseJobGraph(JobID jobId);

404

405

/** Get all job IDs */

406

Collection<JobID> getJobIds();

407

408

/** Get stored job graphs */

409

Collection<JobGraph> recoverJobGraphs();

410

}

411

412

/**

413

* Listener for job graph store events.

414

*/

415

public interface JobGraphListener {

416

/** Called when job graph is added */

417

void onAddedJobGraph(JobID jobId);

418

419

/** Called when job graph is removed */

420

void onRemovedJobGraph(JobID jobId);

421

}

422

```

423

424

**Usage Examples:**

425

426

```java

427

// Configure high availability mode

428

Configuration config = new Configuration();

429

430

// ZooKeeper HA setup

431

config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

432

config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181");

433

config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "production-cluster");

434

config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "hdfs://cluster/flink-ha");

435

436

// Or embedded HA for testing

437

// config.setString(HighAvailabilityOptions.HA_MODE, "NONE");

438

439

// Create HA services

440

HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(

441

config,

442

ioExecutor,

443

AddressResolution.TRY_ADDRESS_RESOLUTION,

444

rpcSystem,

445

fatalErrorHandler

446

);

447

448

// Use job graph store for persistence

449

JobGraphStore jobGraphStore = haServices.getJobGraphStore();

450

jobGraphStore.start(new JobGraphListener() {

451

@Override

452

public void onAddedJobGraph(JobID jobId) {

453

System.out.println("Job graph added: " + jobId);

454

}

455

456

@Override

457

public void onRemovedJobGraph(JobID jobId) {

458

System.out.println("Job graph removed: " + jobId);

459

}

460

});

461

462

// Store job graph

463

jobGraphStore.putJobGraph(jobGraph);

464

465

// Recover job graphs after restart

466

Collection<JobGraph> recoveredJobs = jobGraphStore.recoverJobGraphs();

467

for (JobGraph job : recoveredJobs) {

468

System.out.println("Recovered job: " + job.getJobID());

469

}

470

```

471

472

## Types

473

474

```java { .api }

475

// High availability modes

476

public enum HighAvailabilityMode {

477

NONE("NONE"),

478

ZOOKEEPER("zookeeper"),

479

KUBERNETES("kubernetes");

480

481

private final String value;

482

483

public String getValue();

484

public static HighAvailabilityMode fromConfig(Configuration config);

485

}

486

487

// Leadership session identifiers

488

public class UUID implements Serializable, Comparable<UUID> {

489

public static UUID randomUUID();

490

public static UUID fromString(String name);

491

492

public long getMostSignificantBits();

493

public long getLeastSignificantBits();

494

public String toString();

495

}

496

497

// Running jobs registry

498

public interface RunningJobsRegistry {

499

/** Set job running */

500

void setJobRunning(JobID jobID);

501

502

/** Set job finished */

503

void setJobFinished(JobID jobID);

504

505

/** Get job scheduling status */

506

JobSchedulingStatus getJobSchedulingStatus(JobID jobID);

507

508

/** Get running job IDs */

509

Set<JobID> getRunningJobIds();

510

511

/** Clear job from registry */

512

void clearJob(JobID jobID);

513

}

514

515

public enum JobSchedulingStatus {

516

PENDING,

517

RUNNING,

518

DONE

519

}

520

521

// Checkpoint recovery components

522

public interface CheckpointRecoveryFactory {

523

/** Create completed checkpoint store */

524

CompletedCheckpointStore createRecoveredCompletedCheckpointStore(

525

JobID jobId,

526

int maxNumberOfCheckpointsToRetain,

527

SharedStateRegistryFactory sharedStateRegistryFactory,

528

Executor ioExecutor

529

);

530

531

/** Create checkpoint ID counter */

532

CheckpointIDCounter createCheckpointIDCounter(JobID jobId);

533

}

534

535

// Configuration options

536

public class HighAvailabilityOptions {

537

public static final ConfigOption<String> HA_MODE;

538

public static final ConfigOption<String> HA_CLUSTER_ID;

539

public static final ConfigOption<String> HA_STORAGE_PATH;

540

public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM;

541

public static final ConfigOption<String> HA_ZOOKEEPER_ROOT;

542

public static final ConfigOption<Integer> HA_ZOOKEEPER_SESSION_TIMEOUT;

543

public static final ConfigOption<Integer> HA_ZOOKEEPER_CONNECTION_TIMEOUT;

544

public static final ConfigOption<Integer> HA_ZOOKEEPER_RETRY_WAIT;

545

public static final ConfigOption<Integer> HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS;

546

public static final ConfigOption<String> HA_ZOOKEEPER_NAMESPACE;

547

}

548

549

// Blob store service for distributed file storage

550

public interface BlobStoreService extends Closeable {

551

/** Put blob in store */

552

boolean put(File localFile, BlobKey blobKey);

553

554

/** Get blob from store */

555

boolean get(BlobKey blobKey, File localFile);

556

557

/** Delete blob from store */

558

boolean delete(BlobKey blobKey);

559

560

/** Delete all blobs for job */

561

boolean deleteAll(JobID jobId);

562

563

/** Close the blob store */

564

void close();

565

}

566

```