or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

high-availability.mddocs/

0

# High Availability Services

1

2

The High Availability Services provide cluster coordination and fault tolerance infrastructure for Flink clusters. These services enable leader election, distributed storage coordination, and recovery mechanisms that ensure cluster resilience and continuous operation in the face of node failures.

3

4

## Core Services Interface

5

6

### HighAvailabilityServices

7

8

The primary interface that provides access to all high availability services required by a Flink cluster.

9

10

```java { .api }

11

public interface HighAvailabilityServices extends AutoCloseable {

12

LeaderRetrievalService getResourceManagerLeaderRetriever();

13

LeaderRetrievalService getDispatcherLeaderRetriever();

14

LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

15

LeaderRetrievalService getWebMonitorLeaderRetriever();

16

17

LeaderElectionService getResourceManagerLeaderElectionService();

18

LeaderElectionService getDispatcherLeaderElectionService();

19

LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

20

LeaderElectionService getWebMonitorLeaderElectionService();

21

22

CheckpointRecoveryFactory getCheckpointRecoveryFactory();

23

JobGraphStore getJobGraphStore();

24

JobResultStore getJobResultStore();

25

26

RunningJobsRegistry getRunningJobsRegistry();

27

BlobStore createBlobStore() throws IOException;

28

29

@Override

30

void close() throws Exception;

31

32

void closeAndCleanupAllData() throws Exception;

33

}

34

```

35

36

## Leader Election Services

37

38

### LeaderElectionService

39

40

Service for participating in leader election processes within the cluster.

41

42

```java { .api }

43

public interface LeaderElectionService {

44

void start(LeaderContender contender) throws Exception;

45

void stop() throws Exception;

46

47

void confirmLeadership(UUID leaderSessionID, String leaderAddress);

48

boolean hasLeadership(UUID leaderSessionId);

49

}

50

```

51

52

### LeaderContender

53

54

Interface implemented by components that want to participate in leader election.

55

56

```java { .api }

57

public interface LeaderContender {

58

void grantLeadership(UUID leaderSessionID);

59

void revokeLeadership();

60

String getAddress();

61

void handleError(Exception exception);

62

}

63

```

64

65

### LeaderRetrievalService

66

67

Service for retrieving current leader information and receiving leadership change notifications.

68

69

```java { .api }

70

public interface LeaderRetrievalService {

71

void start(LeaderRetrievalListener listener) throws Exception;

72

void stop() throws Exception;

73

}

74

```

75

76

### LeaderRetrievalListener

77

78

Listener interface for receiving leader change notifications.

79

80

```java { .api }

81

public interface LeaderRetrievalListener {

82

void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);

83

void handleError(Exception exception);

84

}

85

```

86

87

## Storage and Persistence Services

88

89

### CheckpointRecoveryFactory

90

91

Factory for creating checkpoint recovery services that handle checkpoint metadata persistence.

92

93

```java { .api }

94

public interface CheckpointRecoveryFactory {

95

CompletedCheckpointStore createCompletedCheckpointStore(

96

JobID jobId,

97

int maxNumberOfCheckpointsToRetain,

98

ClassLoader userClassLoader

99

) throws Exception;

100

101

CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;

102

}

103

```

104

105

### JobGraphStore

106

107

Persistent storage for job graphs to enable job recovery after failures.

108

109

```java { .api }

110

public interface JobGraphStore {

111

void putJobGraph(StoredJobGraph jobGraph) throws Exception;

112

StoredJobGraph recoverJobGraph(JobID jobId) throws Exception;

113

void removeJobGraph(JobID jobId) throws Exception;

114

115

Collection<JobID> getJobIds() throws Exception;

116

void start(JobGraphListener jobGraphListener) throws Exception;

117

void stop() throws Exception;

118

}

119

```

120

121

### JobResultStore

122

123

Storage for persisting job execution results and status information.

124

125

```java { .api }

126

public interface JobResultStore {

127

void createDirtyResult(JobResultEntry jobResultEntry) throws IOException;

128

void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException;

129

130

boolean hasJobResultEntry(JobID jobId) throws IOException;

131

boolean hasDirtyJobResultEntry(JobID jobId) throws IOException;

132

boolean hasCleanJobResultEntry(JobID jobId) throws IOException;

133

134

Set<JobResult> getDirtyResults() throws IOException;

135

Set<JobResult> getCleanResults() throws IOException;

136

}

137

```

138

139

## Registry Services

140

141

### RunningJobsRegistry

142

143

Registry for tracking which jobs are currently running in the cluster.

144

145

```java { .api }

146

public interface RunningJobsRegistry {

147

void setJobRunning(JobID jobID) throws IOException;

148

void setJobFinished(JobID jobID) throws IOException;

149

150

JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;

151

152

enum JobSchedulingStatus {

153

PENDING,

154

RUNNING,

155

DONE

156

}

157

}

158

```

159

160

### BlobStore

161

162

Distributed storage service for binary large objects (BLOBs) like JAR files and large state.

163

164

```java { .api }

165

public interface BlobStore extends Closeable {

166

boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;

167

boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;

168

boolean delete(JobID jobId, BlobKey blobKey);

169

boolean deleteAll(JobID jobId);

170

171

void closeAndCleanupAllData() throws IOException;

172

}

173

```

174

175

## Utility Classes

176

177

### HighAvailabilityServicesUtils

178

179

Utility class providing factory methods and helper functions for HA services.

180

181

```java { .api }

182

public class HighAvailabilityServicesUtils {

183

public static HighAvailabilityServices createAvailableOrEmbeddedServices(

184

Configuration config,

185

Executor executor

186

) throws Exception;

187

188

public static HighAvailabilityServices createHighAvailabilityServices(

189

Configuration configuration,

190

Executor executor,

191

AddressResolution addressResolution

192

) throws Exception;

193

194

public static String getJobManagerAddress(Configuration config) throws Exception;

195

196

public static LeaderRetrievalService createLeaderRetrievalService(

197

Configuration config,

198

String serviceName

199

) throws Exception;

200

201

public static LeaderElectionService createLeaderElectionService(

202

Configuration config,

203

String serviceName

204

) throws Exception;

205

206

public static void setJobManagerAddress(Configuration config, String address, int port);

207

}

208

```

209

210

## Configuration Options

211

212

### High Availability Configuration Keys

213

214

Configuration options for setting up high availability services.

215

216

```java { .api }

217

public class HighAvailabilityOptions {

218

public static final ConfigOption<String> HA_MODE =

219

key("high-availability").defaultValue("NONE");

220

221

public static final ConfigOption<String> HA_CLUSTER_ID =

222

key("high-availability.cluster-id").defaultValue("default");

223

224

public static final ConfigOption<String> HA_STORAGE_PATH =

225

key("high-availability.storageDir").noDefaultValue();

226

227

public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =

228

key("high-availability.zookeeper.quorum").noDefaultValue();

229

230

public static final ConfigOption<Integer> HA_ZOOKEEPER_SESSION_TIMEOUT =

231

key("high-availability.zookeeper.client.session-timeout").defaultValue(60000);

232

233

public static final ConfigOption<Integer> HA_ZOOKEEPER_CONNECTION_TIMEOUT =

234

key("high-availability.zookeeper.client.connection-timeout").defaultValue(15000);

235

236

public static final ConfigOption<Integer> HA_ZOOKEEPER_RETRY_WAIT =

237

key("high-availability.zookeeper.client.retry-wait").defaultValue(5000);

238

239

public static final ConfigOption<Integer> HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS =

240

key("high-availability.zookeeper.client.max-retry-attempts").defaultValue(3);

241

242

public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =

243

key("high-availability.zookeeper.path.root").defaultValue("/flink");

244

}

245

```

246

247

## Exception Handling

248

249

### HighAvailabilityServicesException

250

251

Base exception for high availability service failures.

252

253

```java { .api }

254

public class HighAvailabilityServicesException extends FlinkException {

255

public HighAvailabilityServicesException(String message);

256

public HighAvailabilityServicesException(String message, Throwable cause);

257

}

258

```

259

260

### LeaderElectionException

261

262

Exception thrown during leader election process failures.

263

264

```java { .api }

265

public class LeaderElectionException extends FlinkException {

266

public LeaderElectionException(String message);

267

public LeaderElectionException(String message, Throwable cause);

268

}

269

```

270

271

## Usage Examples

272

273

### Setting Up High Availability with ZooKeeper

274

275

```java

276

import org.apache.flink.runtime.highavailability.HighAvailabilityServices;

277

import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;

278

import org.apache.flink.configuration.Configuration;

279

import org.apache.flink.configuration.HighAvailabilityOptions;

280

281

// Configure ZooKeeper-based high availability

282

Configuration config = new Configuration();

283

config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

284

config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, "localhost:2181");

285

config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///ha-storage");

286

config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "my-flink-cluster");

287

288

// Create HA services

289

HighAvailabilityServices haServices = HighAvailabilityServicesUtils

290

.createHighAvailabilityServices(config, executor, AddressResolution.TRY_ADDRESS_RESOLUTION);

291

292

try {

293

// Use HA services for cluster coordination

294

LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();

295

LeaderRetrievalService rmLeaderRetrieval = haServices.getResourceManagerLeaderRetriever();

296

297

// Set up leader election for Resource Manager

298

ResourceManagerLeaderContender rmContender = new ResourceManagerLeaderContender();

299

rmLeaderElection.start(rmContender);

300

301

// Set up leader retrieval for clients

302

ResourceManagerLeaderListener rmListener = new ResourceManagerLeaderListener();

303

rmLeaderRetrieval.start(rmListener);

304

305

} finally {

306

haServices.close();

307

}

308

```

309

310

### Implementing a Leader Contender

311

312

```java

313

import org.apache.flink.runtime.highavailability.LeaderContender;

314

import java.util.UUID;

315

316

public class ResourceManagerLeaderContender implements LeaderContender {

317

private volatile boolean isLeader = false;

318

private volatile UUID currentLeaderSessionId;

319

private final String address;

320

321

public ResourceManagerLeaderContender(String address) {

322

this.address = address;

323

}

324

325

@Override

326

public void grantLeadership(UUID leaderSessionID) {

327

synchronized (this) {

328

if (!isLeader) {

329

System.out.println("Granted leadership with session ID: " + leaderSessionID);

330

this.currentLeaderSessionId = leaderSessionID;

331

this.isLeader = true;

332

333

// Confirm leadership and start serving as leader

334

confirmLeadership(leaderSessionID);

335

startLeaderServices();

336

}

337

}

338

}

339

340

@Override

341

public void revokeLeadership() {

342

synchronized (this) {

343

if (isLeader) {

344

System.out.println("Leadership revoked");

345

this.isLeader = false;

346

this.currentLeaderSessionId = null;

347

348

// Stop leader services

349

stopLeaderServices();

350

}

351

}

352

}

353

354

@Override

355

public String getAddress() {

356

return address;

357

}

358

359

@Override

360

public void handleError(Exception exception) {

361

System.err.println("Leader election error: " + exception.getMessage());

362

// Handle leadership errors - may need to restart election

363

revokeLeadership();

364

}

365

366

private void confirmLeadership(UUID leaderSessionID) {

367

// Confirm leadership with the election service

368

leaderElectionService.confirmLeadership(leaderSessionID, address);

369

}

370

371

private void startLeaderServices() {

372

// Initialize services that only the leader should run

373

System.out.println("Starting Resource Manager leader services");

374

}

375

376

private void stopLeaderServices() {

377

// Clean up leader-only services

378

System.out.println("Stopping Resource Manager leader services");

379

}

380

381

public boolean hasLeadership() {

382

return isLeader;

383

}

384

385

public UUID getCurrentLeaderSessionId() {

386

return currentLeaderSessionId;

387

}

388

}

389

```

390

391

### Implementing a Leader Retrieval Listener

392

393

```java

394

import org.apache.flink.runtime.highavailability.LeaderRetrievalListener;

395

import java.util.UUID;

396

397

public class ResourceManagerLeaderListener implements LeaderRetrievalListener {

398

private volatile String currentLeaderAddress;

399

private volatile UUID currentLeaderSessionId;

400

401

@Override

402

public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {

403

synchronized (this) {

404

if (!Objects.equals(currentLeaderAddress, leaderAddress) ||

405

!Objects.equals(currentLeaderSessionId, leaderSessionID)) {

406

407

System.out.println("New Resource Manager leader: " + leaderAddress +

408

" (session: " + leaderSessionID + ")");

409

410

// Update connection to new leader

411

updateLeaderConnection(leaderAddress, leaderSessionID);

412

413

this.currentLeaderAddress = leaderAddress;

414

this.currentLeaderSessionId = leaderSessionID;

415

}

416

}

417

}

418

419

@Override

420

public void handleError(Exception exception) {

421

System.err.println("Leader retrieval error: " + exception.getMessage());

422

423

// Clear current leader information

424

synchronized (this) {

425

this.currentLeaderAddress = null;

426

this.currentLeaderSessionId = null;

427

disconnectFromLeader();

428

}

429

}

430

431

private void updateLeaderConnection(String leaderAddress, UUID leaderSessionId) {

432

// Establish connection to the new leader

433

if (leaderAddress != null) {

434

System.out.println("Connecting to Resource Manager at: " + leaderAddress);

435

// ... connect to leader

436

} else {

437

System.out.println("No Resource Manager leader available");

438

disconnectFromLeader();

439

}

440

}

441

442

private void disconnectFromLeader() {

443

// Clean up connections to previous leader

444

System.out.println("Disconnecting from Resource Manager leader");

445

}

446

447

public String getCurrentLeaderAddress() {

448

return currentLeaderAddress;

449

}

450

451

public UUID getCurrentLeaderSessionId() {

452

return currentLeaderSessionId;

453

}

454

}

455

```

456

457

### Job Graph Store Implementation

458

459

```java

460

import org.apache.flink.runtime.highavailability.JobGraphStore;

461

import org.apache.flink.api.common.JobID;

462

463

public class FileSystemJobGraphStore implements JobGraphStore {

464

private final Path storageDirectory;

465

private volatile JobGraphListener listener;

466

467

public FileSystemJobGraphStore(Path storageDirectory) {

468

this.storageDirectory = storageDirectory;

469

}

470

471

@Override

472

public void putJobGraph(StoredJobGraph jobGraph) throws Exception {

473

JobID jobId = jobGraph.getJobId();

474

Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");

475

476

// Serialize and store job graph

477

try (ObjectOutputStream oos = new ObjectOutputStream(

478

Files.newOutputStream(jobFile))) {

479

oos.writeObject(jobGraph);

480

}

481

482

System.out.println("Stored job graph for: " + jobId);

483

484

// Notify listener

485

if (listener != null) {

486

listener.onAddedJobGraph(jobId);

487

}

488

}

489

490

@Override

491

public StoredJobGraph recoverJobGraph(JobID jobId) throws Exception {

492

Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");

493

494

if (!Files.exists(jobFile)) {

495

throw new Exception("Job graph not found: " + jobId);

496

}

497

498

// Deserialize job graph

499

try (ObjectInputStream ois = new ObjectInputStream(

500

Files.newInputStream(jobFile))) {

501

return (StoredJobGraph) ois.readObject();

502

}

503

}

504

505

@Override

506

public void removeJobGraph(JobID jobId) throws Exception {

507

Path jobFile = storageDirectory.resolve(jobId.toString() + ".job");

508

Files.deleteIfExists(jobFile);

509

510

System.out.println("Removed job graph for: " + jobId);

511

512

// Notify listener

513

if (listener != null) {

514

listener.onRemovedJobGraph(jobId);

515

}

516

}

517

518

@Override

519

public Collection<JobID> getJobIds() throws Exception {

520

if (!Files.exists(storageDirectory)) {

521

return Collections.emptyList();

522

}

523

524

List<JobID> jobIds = new ArrayList<>();

525

try (DirectoryStream<Path> stream = Files.newDirectoryStream(

526

storageDirectory, "*.job")) {

527

528

for (Path path : stream) {

529

String filename = path.getFileName().toString();

530

String jobIdStr = filename.substring(0, filename.lastIndexOf(".job"));

531

jobIds.add(JobID.fromHexString(jobIdStr));

532

}

533

}

534

535

return jobIds;

536

}

537

538

@Override

539

public void start(JobGraphListener jobGraphListener) throws Exception {

540

this.listener = jobGraphListener;

541

542

// Ensure storage directory exists

543

Files.createDirectories(storageDirectory);

544

545

System.out.println("Started FileSystem job graph store at: " + storageDirectory);

546

}

547

548

@Override

549

public void stop() throws Exception {

550

this.listener = null;

551

System.out.println("Stopped FileSystem job graph store");

552

}

553

}

554

```

555

556

### Checkpoint Recovery Factory

557

558

```java

559

import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;

560

import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;

561

import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;

562

563

public class FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory {

564

private final Path checkpointDirectory;

565

private final Configuration configuration;

566

567

public FileSystemCheckpointRecoveryFactory(Path checkpointDirectory, Configuration configuration) {

568

this.checkpointDirectory = checkpointDirectory;

569

this.configuration = configuration;

570

}

571

572

@Override

573

public CompletedCheckpointStore createCompletedCheckpointStore(

574

JobID jobId,

575

int maxNumberOfCheckpointsToRetain,

576

ClassLoader userClassLoader) throws Exception {

577

578

Path jobCheckpointDir = checkpointDirectory.resolve(jobId.toString());

579

580

return new FileSystemCompletedCheckpointStore(

581

jobCheckpointDir,

582

maxNumberOfCheckpointsToRetain,

583

userClassLoader

584

);

585

}

586

587

@Override

588

public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {

589

Path counterFile = checkpointDirectory.resolve(jobId.toString()).resolve("counter");

590

591

return new FileSystemCheckpointIDCounter(counterFile);

592

}

593

}

594

```

595

596

## Common Patterns

597

598

### HA Services Lifecycle Management

599

600

```java

601

public class ClusterManager {

602

private HighAvailabilityServices haServices;

603

private final Configuration configuration;

604

605

public ClusterManager(Configuration configuration) {

606

this.configuration = configuration;

607

}

608

609

public void start() throws Exception {

610

// Initialize HA services

611

haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(

612

configuration,

613

ForkJoinPool.commonPool(),

614

AddressResolution.TRY_ADDRESS_RESOLUTION

615

);

616

617

// Start cluster components with HA support

618

startResourceManager();

619

startDispatcher();

620

startWebMonitor();

621

}

622

623

public void stop() throws Exception {

624

try {

625

// Stop cluster components first

626

stopWebMonitor();

627

stopDispatcher();

628

stopResourceManager();

629

} finally {

630

// Always cleanup HA services

631

if (haServices != null) {

632

haServices.closeAndCleanupAllData();

633

}

634

}

635

}

636

637

private void startResourceManager() throws Exception {

638

LeaderElectionService rmLeaderElection = haServices.getResourceManagerLeaderElectionService();

639

ResourceManagerLeaderContender contender = new ResourceManagerLeaderContender();

640

rmLeaderElection.start(contender);

641

}

642

643

// ... other component management methods

644

}

645

```

646

647

### Robust Error Handling

648

649

```java

650

public class HARobustComponent implements LeaderContender, LeaderRetrievalListener {

651

private final ScheduledExecutorService scheduler;

652

private final AtomicBoolean isRunning = new AtomicBoolean(false);

653

654

@Override

655

public void handleError(Exception exception) {

656

System.err.println("HA error occurred: " + exception.getMessage());

657

658

// Implement exponential backoff retry

659

scheduler.schedule(() -> {

660

if (isRunning.get()) {

661

try {

662

restartHAServices();

663

} catch (Exception e) {

664

System.err.println("Failed to restart HA services: " + e.getMessage());

665

handleError(e); // Recursive retry with backoff

666

}

667

}

668

}, calculateBackoffDelay(), TimeUnit.MILLISECONDS);

669

}

670

671

private void restartHAServices() throws Exception {

672

// Restart HA service connections

673

System.out.println("Restarting HA services");

674

// ... restart logic

675

}

676

677

private long calculateBackoffDelay() {

678

// Implement exponential backoff

679

return Math.min(1000 * (long) Math.pow(2, retryCount), 30000);

680

}

681

}

682

```