or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdentry-points.mdhigh-availability.mdindex.mdresource-management.mdtask-scheduling.mdutilities.md

high-availability.mddocs/

0

# High Availability Storage

1

2

Persistent storage interfaces for maintaining cluster state and worker information across framework restarts and failures. The HA system provides fault tolerance, state recovery, and consistent cluster management in production environments.

3

4

## Capabilities

5

6

### Worker Store Interface

7

8

Core interface for persistent storage of Mesos worker state and framework information, supporting both standalone and distributed storage implementations.

9

10

```java { .api }

11

/**

12

* Persistent store for Mesos worker state and framework information

13

* Provides fault tolerance and state recovery for high availability deployments

14

*/

15

public interface MesosWorkerStore {

16

/**

17

* Initialize the worker store and establish connections

18

* Must be called before any other operations

19

* @throws Exception if the worker store cannot be started

20

*/

21

void start() throws Exception;

22

23

/**

24

* Stop the worker store and cleanup resources

25

* @param cleanup - Whether to perform cleanup operations (remove stored data)

26

* @throws Exception if the worker store cannot be stopped properly

27

*/

28

void stop(boolean cleanup) throws Exception;

29

30

/**

31

* Get the stored Mesos framework ID for framework re-registration

32

* @return Optional framework ID, empty if none stored

33

* @throws Exception if the framework ID cannot be retrieved

34

*/

35

Option<Protos.FrameworkID> getFrameworkID() throws Exception;

36

37

/**

38

* Store the Mesos framework ID for persistent framework identity

39

* @param frameworkID - Framework ID to store, or empty to clear

40

* @throws Exception if the framework ID cannot be stored

41

*/

42

void setFrameworkID(Option<Protos.FrameworkID> frameworkID) throws Exception;

43

44

/**

45

* Recover all stored worker information after restart

46

* Used during framework recovery to restore cluster state

47

* @return List of all stored workers with their current state

48

* @throws Exception if worker information cannot be recovered

49

*/

50

List<Worker> recoverWorkers() throws Exception;

51

52

/**

53

* Generate a new unique task ID for worker identification

54

* Ensures task ID uniqueness across framework restarts

55

* @return New unique Mesos task ID

56

* @throws Exception if a new task ID cannot be generated

57

*/

58

Protos.TaskID newTaskID() throws Exception;

59

60

/**

61

* Store worker information persistently

62

* Updates existing worker if task ID already exists

63

* @param worker - Worker information to store

64

* @throws Exception if the worker cannot be stored

65

*/

66

void putWorker(Worker worker) throws Exception;

67

68

/**

69

* Remove worker from persistent storage

70

* @param taskID - Task ID of worker to remove

71

* @return true if worker was found and removed, false otherwise

72

* @throws Exception if the worker cannot be removed

73

*/

74

boolean removeWorker(Protos.TaskID taskID) throws Exception;

75

}

76

```

77

78

### Worker State Management

79

80

Nested classes within MesosWorkerStore for representing stored worker information and lifecycle states.

81

82

```java { .api }

83

/**

84

* Stored worker representation with state and launch information

85

* Contains all information needed to recover worker after framework restart

86

*/

87

public static class Worker implements Serializable {

88

/**

89

* Create worker entry for storage

90

* @param taskId - Unique Mesos task ID

91

* @param launchableWorker - Worker launch specification

92

* @param state - Current worker lifecycle state

93

*/

94

public Worker(Protos.TaskID taskId,

95

LaunchableMesosWorker launchableWorker,

96

WorkerState state);

97

98

/**

99

* Get the Mesos task ID for this worker

100

* @return Unique task identifier

101

*/

102

public Protos.TaskID taskID();

103

104

/**

105

* Get the launchable worker specification

106

* @return Worker launch configuration and requirements

107

*/

108

public LaunchableMesosWorker launchableMesosWorker();

109

110

/**

111

* Get the current worker lifecycle state

112

* @return Current state in worker lifecycle

113

*/

114

public WorkerState state();

115

116

/**

117

* Create a new worker with updated state

118

* @param newState - New lifecycle state

119

* @return New Worker instance with updated state

120

*/

121

public Worker withState(WorkerState newState);

122

}

123

124

/**

125

* Worker lifecycle states for state machine management

126

*/

127

public enum WorkerState {

128

/** Worker created but not yet launched */

129

New,

130

/** Worker successfully launched on Mesos slave */

131

Launched,

132

/** Worker released and no longer active */

133

Released

134

}

135

```

136

137

**Worker State Management Example:**

138

139

```java

140

import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;

141

import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore.Worker;

142

import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore.WorkerState;

143

144

// Store new worker

145

MesosWorkerStore store = /* ... */;

146

Protos.TaskID taskId = store.newTaskID();

147

LaunchableMesosWorker launchableWorker = /* ... */;

148

149

Worker newWorker = new Worker(taskId, launchableWorker, WorkerState.New);

150

store.putWorker(newWorker);

151

152

// Update worker state after successful launch

153

Worker launchedWorker = newWorker.withState(WorkerState.Launched);

154

store.putWorker(launchedWorker);

155

156

// Remove worker when no longer needed

157

store.removeWorker(taskId);

158

```

159

160

### Standalone Worker Store

161

162

In-memory implementation of MesosWorkerStore suitable for single-node deployments and development environments.

163

164

```java { .api }

165

/**

166

* In-memory implementation of MesosWorkerStore for standalone deployments

167

* Data is not persisted across process restarts - suitable for development only

168

*/

169

public class StandaloneMesosWorkerStore implements MesosWorkerStore {

170

/**

171

* Create standalone worker store with configuration

172

* @param config - Flink configuration (unused in standalone mode)

173

*/

174

public StandaloneMesosWorkerStore(Configuration config);

175

176

// Implements all MesosWorkerStore interface methods

177

// Data stored in memory only - lost on restart

178

}

179

```

180

181

### ZooKeeper Worker Store

182

183

Distributed implementation of MesosWorkerStore using Apache ZooKeeper for persistent, fault-tolerant storage in production environments.

184

185

```java { .api }

186

/**

187

* ZooKeeper-based implementation of MesosWorkerStore for high availability

188

* Provides persistent storage with automatic failover and consistency guarantees

189

*/

190

public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {

191

/**

192

* Create ZooKeeper-based worker store

193

* @param curatorFramework - ZooKeeper client framework

194

* @param configuration - Flink configuration with ZK settings

195

*/

196

public ZooKeeperMesosWorkerStore(CuratorFramework curatorFramework,

197

Configuration configuration);

198

199

// Implements all MesosWorkerStore interface methods

200

// Data persisted in ZooKeeper with automatic replication

201

}

202

```

203

204

**ZooKeeper Configuration Example:**

205

206

```java

207

import org.apache.flink.configuration.Configuration;

208

import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;

209

import org.apache.curator.framework.CuratorFramework;

210

import org.apache.curator.framework.CuratorFrameworkFactory;

211

212

// Configure ZooKeeper connection

213

Configuration config = new Configuration();

214

config.setString("high-availability", "zookeeper");

215

config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");

216

config.setString("high-availability.zookeeper.path.root", "/flink");

217

config.setString("high-availability.cluster-id", "production-cluster-1");

218

219

// Create ZooKeeper client

220

CuratorFramework curator = CuratorFrameworkFactory.newClient(

221

"zk1:2181,zk2:2181,zk3:2181",

222

new ExponentialBackoffRetry(1000, 3)

223

);

224

curator.start();

225

226

// Create HA worker store

227

ZooKeeperMesosWorkerStore store = new ZooKeeperMesosWorkerStore(curator, config);

228

store.start();

229

```

230

231

## High Availability Patterns

232

233

### Framework Recovery

234

235

Complete framework state recovery after master restart or failover:

236

237

```java

238

import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;

239

240

// Framework recovery procedure

241

public void recoverFrameworkState(MesosWorkerStore store) {

242

// Recover framework ID for re-registration

243

Option<Protos.FrameworkID> frameworkId = store.getFrameworkID();

244

245

if (frameworkId.isDefined()) {

246

// Re-register with existing framework ID

247

reregisterFramework(frameworkId.get());

248

} else {

249

// Fresh registration - first time startup

250

registerNewFramework();

251

}

252

253

// Recover all workers and their states

254

List<MesosWorkerStore.Worker> workers = store.recoverWorkers();

255

256

for (MesosWorkerStore.Worker worker : workers) {

257

switch (worker.state()) {

258

case New:

259

// Worker was created but never launched - retry launch

260

retryWorkerLaunch(worker);

261

break;

262

case Launched:

263

// Worker was launched - verify status and reconcile

264

reconcileWorkerState(worker);

265

break;

266

case Released:

267

// Worker was released - clean up if needed

268

cleanupReleasedWorker(worker);

269

break;

270

}

271

}

272

}

273

```

274

275

### State Synchronization

276

277

Consistent state management across framework components:

278

279

```java

280

// Synchronized worker lifecycle management

281

public class WorkerLifecycleManager {

282

private final MesosWorkerStore store;

283

284

public void launchWorker(LaunchableMesosWorker launchableWorker) {

285

// Create task ID and store initial state

286

Protos.TaskID taskId = store.newTaskID();

287

MesosWorkerStore.Worker worker = new MesosWorkerStore.Worker(

288

taskId, launchableWorker, WorkerState.New

289

);

290

store.putWorker(worker);

291

292

try {

293

// Attempt to launch worker

294

launchWorkerOnMesos(launchableWorker);

295

296

// Update state to launched on success

297

store.putWorker(worker.withState(WorkerState.Launched));

298

299

} catch (Exception e) {

300

// Remove worker on launch failure

301

store.removeWorker(taskId);

302

throw e;

303

}

304

}

305

306

public void releaseWorker(Protos.TaskID taskId) {

307

// Get current worker state

308

List<MesosWorkerStore.Worker> workers = store.recoverWorkers();

309

MesosWorkerStore.Worker worker = findWorkerById(workers, taskId);

310

311

if (worker != null) {

312

// Update state to released

313

store.putWorker(worker.withState(WorkerState.Released));

314

315

// Perform cleanup after state update

316

cleanupWorkerResources(worker);

317

318

// Remove from store after successful cleanup

319

store.removeWorker(taskId);

320

}

321

}

322

}

323

```

324

325

### Backup and Restore

326

327

Data backup strategies for disaster recovery:

328

329

```java

330

// Backup framework state

331

public void backupFrameworkState(MesosWorkerStore store, String backupLocation) {

332

// Get all persistent state

333

Option<Protos.FrameworkID> frameworkId = store.getFrameworkID();

334

List<MesosWorkerStore.Worker> workers = store.recoverWorkers();

335

336

// Create backup data structure

337

FrameworkBackup backup = new FrameworkBackup(frameworkId, workers, System.currentTimeMillis());

338

339

// Serialize and store backup

340

writeBackupToStorage(backup, backupLocation);

341

}

342

343

// Restore framework state from backup

344

public void restoreFrameworkState(MesosWorkerStore store, String backupLocation) {

345

// Load backup data

346

FrameworkBackup backup = readBackupFromStorage(backupLocation);

347

348

// Restore framework ID

349

if (backup.getFrameworkId().isDefined()) {

350

store.setFrameworkID(backup.getFrameworkId());

351

}

352

353

// Restore worker states

354

for (MesosWorkerStore.Worker worker : backup.getWorkers()) {

355

store.putWorker(worker);

356

}

357

}

358

```

359

360

## Error Handling and Recovery

361

362

### Connection Failures

363

364

Robust handling of storage backend connection failures:

365

366

- **Automatic retry**: Exponential backoff for transient failures

367

- **Circuit breaker**: Prevent cascade failures during outages

368

- **Graceful degradation**: Continue operation with reduced functionality

369

- **Health monitoring**: Continuous monitoring of storage backend health

370

371

### Data Consistency

372

373

Ensuring consistent state across distributed components:

374

375

- **Atomic operations**: All-or-nothing state updates

376

- **Conflict resolution**: Handling concurrent updates from multiple instances

377

- **Version control**: Optimistic concurrency control for state updates

378

- **Consistency checks**: Periodic validation of stored state integrity

379

380

### Split-Brain Prevention

381

382

Protection against split-brain scenarios in distributed deployments:

383

384

- **Leader election**: Single active resource manager instance

385

- **Fencing mechanisms**: Prevent zombie processes from corrupting state

386

- **Quorum requirements**: Majority consensus for critical operations

387

- **Timeout handling**: Appropriate timeouts for distributed operations

388

389

## Performance Optimization

390

391

### Batch Operations

392

393

Efficient handling of bulk state operations:

394

395

```java

396

// Batch worker state updates

397

public void updateWorkerStates(Map<Protos.TaskID, WorkerState> stateUpdates) {

398

List<MesosWorkerStore.Worker> workers = store.recoverWorkers();

399

400

// Batch update all workers

401

for (Map.Entry<Protos.TaskID, WorkerState> entry : stateUpdates.entrySet()) {

402

MesosWorkerStore.Worker worker = findWorkerById(workers, entry.getKey());

403

if (worker != null) {

404

store.putWorker(worker.withState(entry.getValue()));

405

}

406

}

407

}

408

```

409

410

### Connection Pooling

411

412

Optimized connections to storage backends:

413

414

- **Connection reuse**: Pool connections for ZooKeeper operations

415

- **Session management**: Persistent sessions with automatic renewal

416

- **Connection monitoring**: Health checking and automatic reconnection

417

- **Resource cleanup**: Proper cleanup of connections and sessions

418

419

## Monitoring and Observability

420

421

### Metrics and Monitoring

422

423

Key metrics for HA storage monitoring:

424

425

- **Storage latency**: Response times for read/write operations

426

- **Connection health**: Status of storage backend connections

427

- **State consistency**: Validation of stored state integrity

428

- **Recovery metrics**: Time to recover from failures

429

430

### Alerting

431

432

Critical alerts for HA storage systems:

433

434

- **Storage backend failures**: Connection losses or timeouts

435

- **State corruption**: Inconsistent or invalid stored state

436

- **Recovery failures**: Problems during framework recovery

437

- **Resource exhaustion**: Storage space or connection limits

438

439

## Deprecation Notice

440

441

All high availability storage classes are deprecated as of Flink 1.13. Migration paths:

442

443

- **Kubernetes**: Use Kubernetes ConfigMaps/Secrets for state storage

444

- **YARN**: Use YARN's resource manager state store

445

- **Standalone**: Use Flink's built-in HA storage mechanisms

446

447

## Types

448

449

```java { .api }

450

/**

451

* Framework backup data structure

452

*/

453

public class FrameworkBackup implements Serializable {

454

public Option<Protos.FrameworkID> getFrameworkId();

455

public List<MesosWorkerStore.Worker> getWorkers();

456

public long getTimestamp();

457

public String getVersion();

458

}

459

460

/**

461

* Storage configuration for HA deployments

462

*/

463

public class HAStorageConfiguration {

464

public String getStorageType(); // "standalone" or "zookeeper"

465

public String getZooKeeperQuorum();

466

public String getStoragePath();

467

public int getConnectionTimeout();

468

public int getSessionTimeout();

469

}

470

471

/**

472

* Worker recovery information

473

*/

474

public class WorkerRecoveryInfo {

475

public Protos.TaskID getTaskId();

476

public WorkerState getLastKnownState();

477

public long getLastUpdateTime();

478

public boolean requiresReconciliation();

479

}

480

```