or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconnection-configuration.mdfault-tolerance.mdindex.mdstreaming-sinks.md

fault-tolerance.mddocs/

0

# Fault Tolerance & Write-Ahead Logging

1

2

The Flink Cassandra Connector provides exactly-once processing guarantees through checkpoint coordination and write-ahead logging for streaming applications. This ensures data consistency and prevents data loss even in the presence of failures.

3

4

## Capabilities

5

6

### CassandraCommitter

7

8

CheckpointCommitter implementation that stores checkpoint information in a dedicated Cassandra table for coordination between Flink checkpoints and Cassandra writes.

9

10

```java { .api }

11

/**

12

* CheckpointCommitter that saves completed checkpoint information in Cassandra

13

* Creates entries in format: |operator_id | subtask_id | last_completed_checkpoint|

14

*/

15

public class CassandraCommitter extends CheckpointCommitter {

16

/**

17

* Creates committer with default keyspace 'flink_auxiliary'

18

* @param builder ClusterBuilder for Cassandra connection configuration

19

*/

20

public CassandraCommitter(ClusterBuilder builder);

21

22

/**

23

* Creates committer with custom keyspace for checkpoint storage

24

* @param builder ClusterBuilder for Cassandra connection configuration

25

* @param keySpace custom keyspace name for checkpoint table storage

26

*/

27

public CassandraCommitter(ClusterBuilder builder, String keySpace);

28

29

/**

30

* Sets job ID for checkpoint table naming (called internally by Flink)

31

* @param id unique job identifier

32

* @throws Exception if setup fails

33

*/

34

public void setJobId(String id) throws Exception;

35

36

/**

37

* Creates necessary keyspace and checkpoint table if they don't exist

38

* @throws Exception if table/keyspace creation fails

39

*/

40

@Override

41

public void createResource() throws Exception;

42

43

/**

44

* Opens connection for checkpoint operations

45

* @throws Exception if connection setup fails

46

*/

47

@Override

48

public void open() throws Exception;

49

50

/**

51

* Closes connections and clears checkpoint cache

52

* @throws Exception if cleanup fails

53

*/

54

@Override

55

public void close() throws Exception;

56

57

/**

58

* Records checkpoint completion in Cassandra

59

* @param subtaskIdx subtask index that completed checkpoint

60

* @param checkpointId checkpoint ID that was completed

61

*/

62

@Override

63

public void commitCheckpoint(int subtaskIdx, long checkpointId);

64

65

/**

66

* Checks if a specific checkpoint has been committed

67

* Uses local cache to minimize Cassandra queries

68

* @param subtaskIdx subtask index to check

69

* @param checkpointId checkpoint ID to verify

70

* @return true if checkpoint has been committed

71

*/

72

@Override

73

public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);

74

}

75

```

76

77

### Write-Ahead Logging Configuration

78

79

Enable exactly-once processing by configuring write-ahead logging in sink builders.

80

81

**Basic WAL Configuration:**

82

83

```java

84

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

85

import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;

86

87

// Enable WAL with default committer

88

CassandraSink<Tuple3<String, Integer, String>> walSink = CassandraSink

89

.addSink(stream)

90

.setQuery("INSERT INTO users (id, age, name) VALUES (?, ?, ?)")

91

.setHost("localhost")

92

.enableWriteAheadLog() // Uses CassandraCommitter with default keyspace

93

.build();

94

```

95

96

**Custom WAL Configuration:**

97

98

```java

99

// Create custom committer with specific keyspace

100

ClusterBuilder committerClusterBuilder = new ClusterBuilder() {

101

@Override

102

protected Cluster buildCluster(Cluster.Builder builder) {

103

return builder

104

.addContactPoint("cassandra-checkpoint.example.com")

105

.withPort(9042)

106

.withCredentials("checkpoint_user", "checkpoint_password")

107

.build();

108

}

109

};

110

111

CassandraCommitter customCommitter = new CassandraCommitter(

112

committerClusterBuilder,

113

"custom_checkpoint_keyspace"

114

);

115

116

// Enable WAL with custom committer

117

CassandraSink<Tuple4<String, Long, Double, Boolean>> customWalSink = CassandraSink

118

.addSink(complexStream)

119

.setQuery("INSERT INTO transactions (tx_id, timestamp, amount, processed) VALUES (?, ?, ?, ?)")

120

.setClusterBuilder(dataClusterBuilder)

121

.enableWriteAheadLog(customCommitter)

122

.build();

123

```

124

125

### CassandraTupleWriteAheadSink Implementation

126

127

The underlying write-ahead logging sink implementation that provides exactly-once guarantees.

128

129

```java { .api }

130

/**

131

* Write-ahead logging sink that stores incoming records in Flink's state backend

132

* and commits them to Cassandra only when checkpoint completes successfully

133

*/

134

public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {

135

/**

136

* Creates WAL sink with checkpoint coordination

137

* @param insertQuery CQL INSERT statement with parameter placeholders

138

* @param serializer TypeSerializer for storing records in state backend

139

* @param builder ClusterBuilder for Cassandra connection

140

* @param committer CheckpointCommitter for tracking checkpoint completion

141

* @throws Exception if initialization fails

142

*/

143

protected CassandraTupleWriteAheadSink(

144

String insertQuery,

145

TypeSerializer<IN> serializer,

146

ClusterBuilder builder,

147

CheckpointCommitter committer

148

) throws Exception;

149

150

/**

151

* Opens connections and validates checkpointing is enabled

152

* @throws Exception if checkpointing is disabled or connection fails

153

*/

154

public void open() throws Exception;

155

156

/**

157

* Closes connections and cleans up resources

158

* @throws Exception if cleanup fails

159

*/

160

@Override

161

public void close() throws Exception;

162

163

/**

164

* Sends batch of values to Cassandra with checkpoint coordination

165

* @param values batch of tuples to write

166

* @param checkpointId checkpoint ID for coordination

167

* @param timestamp checkpoint timestamp

168

* @return true if all writes successful, false if any failed

169

* @throws Exception if batch processing fails

170

*/

171

@Override

172

protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;

173

}

174

```

175

176

## Fault Tolerance Patterns

177

178

### Exactly-Once Processing Example

179

180

Complete example showing exactly-once processing setup with proper error handling.

181

182

```java

183

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

184

import org.apache.flink.streaming.api.CheckpointingMode;

185

import org.apache.flink.streaming.api.datastream.DataStream;

186

import org.apache.flink.api.java.tuple.Tuple3;

187

188

// Configure execution environment for exactly-once processing

189

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

190

191

// Enable checkpointing with exactly-once mode

192

env.enableCheckpointing(10000); // checkpoint every 10 seconds

193

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

194

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

195

env.getCheckpointConfig().setCheckpointTimeout(60000);

196

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

197

198

// Configure state backend for durability

199

env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints"));

200

201

// Data source

202

DataStream<Tuple3<String, Long, String>> criticalEvents = env

203

.addSource(new CriticalEventSource())

204

.name("Critical Events Source");

205

206

// Create WAL-enabled sink for exactly-once guarantees

207

ClusterBuilder sinkClusterBuilder = new ClusterBuilder() {

208

@Override

209

protected Cluster buildCluster(Cluster.Builder builder) {

210

return builder

211

.addContactPoint("cassandra-primary.example.com")

212

.addContactPoint("cassandra-secondary.example.com")

213

.withPort(9042)

214

.withCredentials("sink_user", "sink_password")

215

.withRetryPolicy(new ExponentialReconnectionPolicy(1000, 10000))

216

.build();

217

}

218

};

219

220

ClusterBuilder checkpointClusterBuilder = new ClusterBuilder() {

221

@Override

222

protected Cluster buildCluster(Cluster.Builder builder) {

223

return builder

224

.addContactPoint("cassandra-checkpoint.example.com")

225

.withPort(9042)

226

.withCredentials("checkpoint_user", "checkpoint_password")

227

.build();

228

}

229

};

230

231

CassandraCommitter committer = new CassandraCommitter(

232

checkpointClusterBuilder,

233

"flink_checkpoints"

234

);

235

236

CassandraSink<Tuple3<String, Long, String>> exactlyOnceSink = CassandraSink

237

.addSink(criticalEvents)

238

.setQuery("INSERT INTO critical_events (event_id, timestamp, data) VALUES (?, ?, ?)")

239

.setClusterBuilder(sinkClusterBuilder)

240

.enableWriteAheadLog(committer)

241

.build();

242

243

exactlyOnceSink

244

.name("Critical Events Cassandra Sink")

245

.uid("critical-events-sink") // Important for savepoint compatibility

246

.setParallelism(4);

247

248

env.execute("Critical Events Processing");

249

```

250

251

### Recovery and Restart Behavior

252

253

Understanding how the connector behaves during failures and recovery.

254

255

```java

256

// Recovery behavior configuration

257

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

258

3, // number of restart attempts

259

Time.of(10, TimeUnit.SECONDS) // delay between restarts

260

));

261

262

// On failure and restart:

263

// 1. Flink restores from last successful checkpoint

264

// 2. CassandraCommitter checks which records were already committed

265

// 3. WAL sink replays only uncommitted records

266

// 4. Processing continues from the checkpoint point

267

268

// Custom restart strategy for production

269

env.setRestartStrategy(RestartStrategies.failureRateRestart(

270

5, // max failures per interval

271

Time.of(5, TimeUnit.MINUTES), // time interval

272

Time.of(30, TimeUnit.SECONDS) // delay between restarts

273

));

274

```

275

276

### Monitoring Fault Tolerance

277

278

Implement monitoring for checkpoint and recovery metrics.

279

280

```java

281

// Custom metrics for monitoring WAL sink performance

282

public class MonitoredCassandraCommitter extends CassandraCommitter {

283

private final Counter checkpointCommits;

284

private final Histogram checkpointLatency;

285

286

public MonitoredCassandraCommitter(ClusterBuilder builder, MetricGroup metricGroup) {

287

super(builder);

288

this.checkpointCommits = metricGroup.counter("checkpoint_commits");

289

this.checkpointLatency = metricGroup.histogram("checkpoint_latency");

290

}

291

292

@Override

293

public void commitCheckpoint(int subtaskIdx, long checkpointId) {

294

long startTime = System.currentTimeMillis();

295

super.commitCheckpoint(subtaskIdx, checkpointId);

296

long latency = System.currentTimeMillis() - startTime;

297

298

checkpointCommits.inc();

299

checkpointLatency.update(latency);

300

}

301

}

302

```

303

304

## Checkpoint Storage Configuration

305

306

### Default Checkpoint Table Schema

307

308

The CassandraCommitter creates the following table structure:

309

310

```sql

311

-- Default keyspace: flink_auxiliary

312

-- Default table name: checkpoints_{job_id}

313

CREATE TABLE IF NOT EXISTS flink_auxiliary.checkpoints_job123 (

314

sink_id text,

315

sub_id int,

316

checkpoint_id bigint,

317

PRIMARY KEY (sink_id, sub_id)

318

);

319

```

320

321

### Custom Checkpoint Storage

322

323

Configure custom keyspace and table settings for checkpoint storage.

324

325

```java

326

// Custom keyspace configuration

327

CassandraCommitter committer = new CassandraCommitter(builder, "production_checkpoints");

328

329

// The committer will create:

330

// - Keyspace: production_checkpoints

331

// - Table: checkpoints_{job_id}

332

// - Replication: SimpleStrategy with replication_factor=1 (default)

333

334

// For production, consider creating the keyspace manually with appropriate replication:

335

/*

336

CREATE KEYSPACE production_checkpoints

337

WITH replication = {

338

'class': 'NetworkTopologyStrategy',

339

'datacenter1': 3,

340

'datacenter2': 2

341

};

342

*/

343

```

344

345

## Best Practices

346

347

### Checkpointing Configuration

348

349

- **Checkpoint interval**: 10-60 seconds depending on throughput and latency requirements

350

- **Exactly-once mode**: Always use `CheckpointingMode.EXACTLY_ONCE` for critical data

351

- **Concurrent checkpoints**: Set to 1 to avoid resource contention

352

- **Checkpoint timeout**: Set based on cluster size and network latency

353

354

### Resource Management

355

356

- **Separate clusters**: Consider using separate Cassandra clusters for data and checkpoints

357

- **Keyspace isolation**: Use dedicated keyspace for checkpoint tables

358

- **Connection pooling**: Configure appropriate connection pools for both data and checkpoint connections

359

360

### Error Handling

361

362

- **Restart strategies**: Configure appropriate restart strategies for your use case

363

- **Alerting**: Monitor checkpoint failure rates and set up alerts

364

- **Manual intervention**: Plan procedures for manual checkpoint recovery if needed

365

366

### Performance Optimization

367

368

- **Batch processing**: WAL sink processes records in batches during checkpoints

369

- **Parallelism**: Configure sink parallelism based on Cassandra cluster capacity

370

- **Network optimization**: Co-locate Flink and Cassandra for reduced latency