or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-connectors.mdconfiguration.mdindex.mdstreaming-sinks.mdtable-api.mdwrite-ahead-logging.md

write-ahead-logging.mddocs/

0

# Write-Ahead Logging

1

2

Exactly-once processing guarantees through write-ahead logging with checkpoint integration. Stores records in Flink's state backend and commits them to Cassandra only on successful checkpoint completion, ensuring data consistency even in the presence of failures.

3

4

## Capabilities

5

6

### Write-Ahead Log Concept

7

8

Write-ahead logging provides exactly-once processing semantics by:

9

10

1. **Buffering**: Incoming records are stored in Flink's state backend instead of being written directly to Cassandra

11

2. **Checkpointing**: Records are held until a Flink checkpoint successfully completes

12

3. **Batch Commit**: On checkpoint completion, buffered records are written to Cassandra in batches

13

4. **Failure Recovery**: If a failure occurs before checkpoint completion, buffered records are discarded and reprocessed

14

15

This ensures that each record is written to Cassandra exactly once, even if the job fails and restarts.

16

17

### Tuple Write-Ahead Sink

18

19

Write-ahead log implementation for Flink Tuple types.

20

21

```java { .api }

22

public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {

23

public CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer);

24

public void open();

25

public void close();

26

protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp);

27

}

28

```

29

30

**Usage Example:**

31

32

```java

33

import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;

34

import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;

35

import org.apache.flink.api.java.tuple.Tuple3;

36

import org.apache.flink.streaming.api.datastream.DataStream;

37

38

// Enable checkpointing in the environment

39

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

40

env.enableCheckpointing(5000); // Checkpoint every 5 seconds

41

42

// Create cluster builder

43

ClusterBuilder builder = new ClusterBuilder() {

44

@Override

45

protected Cluster buildCluster(Cluster.Builder builder) {

46

return builder.addContactPoint("127.0.0.1").build();

47

}

48

};

49

50

// Create write-ahead sink using the builder pattern

51

DataStream<Tuple3<String, Integer, Long>> stream = // ... your data stream

52

53

CassandraSink.addSink(stream)

54

.setQuery("INSERT INTO example.events (id, count, timestamp) VALUES (?, ?, ?);")

55

.setClusterBuilder(builder)

56

.enableWriteAheadLog() // This creates the write-ahead sink internally

57

.build();

58

```

59

60

**Manual Construction:**

61

62

```java

63

// Manual construction (not typically needed)

64

TypeSerializer<Tuple3<String, Integer, Long>> serializer =

65

stream.getType().createSerializer(env.getConfig());

66

67

CassandraCommitter committer = new CassandraCommitter(builder);

68

69

CassandraTupleWriteAheadSink<Tuple3<String, Integer, Long>> walSink =

70

new CassandraTupleWriteAheadSink<>(

71

"INSERT INTO example.events (id, count, timestamp) VALUES (?, ?, ?);",

72

serializer,

73

builder,

74

committer

75

);

76

77

stream.transform("Cassandra WAL Sink", null, walSink);

78

```

79

80

### Row Write-Ahead Sink

81

82

Write-ahead log implementation for Flink Row types.

83

84

```java { .api }

85

public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> {

86

public CassandraRowWriteAheadSink(String insertQuery, TypeSerializer<Row> serializer, ClusterBuilder builder, CheckpointCommitter committer);

87

public void open();

88

public void close();

89

protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp);

90

}

91

```

92

93

**Usage Example:**

94

95

```java

96

import org.apache.flink.streaming.connectors.cassandra.CassandraRowWriteAheadSink;

97

import org.apache.flink.types.Row;

98

99

// Enable checkpointing

100

env.enableCheckpointing(10000); // Checkpoint every 10 seconds

101

102

DataStream<Row> rowStream = // ... your row stream

103

104

CassandraSink.addSink(rowStream)

105

.setQuery("INSERT INTO example.metrics (id, value, timestamp) VALUES (?, ?, ?);")

106

.setClusterBuilder(builder)

107

.enableWriteAheadLog()

108

.build();

109

```

110

111

### Checkpoint Committer

112

113

Manages checkpoint information storage and retrieval for exactly-once processing.

114

115

```java { .api }

116

public class CassandraCommitter extends CheckpointCommitter {

117

public CassandraCommitter(ClusterBuilder builder);

118

public CassandraCommitter(ClusterBuilder builder, String keySpace);

119

public void setJobId(String id);

120

public void createResource();

121

public void open();

122

public void close();

123

public void commitCheckpoint(int subtaskIdx, long checkpointId);

124

public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);

125

}

126

```

127

128

**Default Behavior:**

129

130

```java

131

// Default committer (uses default keyspace and table names)

132

CassandraCommitter defaultCommitter = new CassandraCommitter(builder);

133

```

134

135

**Custom Configuration:**

136

137

```java

138

// Custom keyspace for checkpoint metadata

139

CassandraCommitter customCommitter = new CassandraCommitter(builder, "checkpoint_ks");

140

customCommitter.setJobId("my-flink-job-v1");

141

142

// The committer will create the following schema:

143

// KEYSPACE checkpoint_ks

144

// TABLE checkpoint_ks.checkpoints_my_flink_job_v1 (

145

// sink_id int,

146

// checkpoint_id bigint,

147

// PRIMARY KEY (sink_id, checkpoint_id)

148

// )

149

```

150

151

**Manual Setup:**

152

153

```java

154

// Initialize the checkpoint storage manually

155

CassandraCommitter committer = new CassandraCommitter(builder, "checkpoints");

156

committer.setJobId("data-pipeline");

157

committer.createResource(); // Creates keyspace and table

158

committer.open();

159

160

// Use with write-ahead sink

161

CassandraTupleWriteAheadSink<Tuple2<String, Integer>> walSink =

162

new CassandraTupleWriteAheadSink<>(

163

"INSERT INTO example.data (key, value) VALUES (?, ?);",

164

serializer,

165

builder,

166

committer

167

);

168

```

169

170

## Advanced Usage Patterns

171

172

### Checkpoint Configuration

173

174

Proper checkpoint configuration is critical for write-ahead logging:

175

176

```java

177

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

178

179

// Enable checkpointing with appropriate interval

180

env.enableCheckpointing(5000); // 5 second intervals

181

182

// Configure checkpoint behavior

183

CheckpointConfig config = env.getCheckpointConfig();

184

config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

185

config.setMinPauseBetweenCheckpoints(1000); // 1 second pause between checkpoints

186

config.setMaxConcurrentCheckpoints(1); // Only one concurrent checkpoint

187

config.setCheckpointTimeout(300000); // 5 minute timeout

188

config.setFailOnCheckpointingErrors(true); // Fail job on checkpoint errors

189

190

// Enable external checkpoints for recovery

191

config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

192

```

193

194

### Performance Considerations

195

196

Write-ahead logging trades throughput for consistency:

197

198

```java

199

// High-throughput configuration

200

env.enableCheckpointing(30000); // Longer checkpoint intervals

201

config.setMaxConcurrentCheckpoints(1);

202

config.setMinPauseBetweenCheckpoints(5000);

203

204

// Low-latency configuration

205

env.enableCheckpointing(1000); // Frequent checkpoints

206

config.setMaxConcurrentCheckpoints(1);

207

config.setMinPauseBetweenCheckpoints(500);

208

209

// Balanced configuration

210

env.enableCheckpointing(10000); // 10 second intervals

211

config.setMaxConcurrentCheckpoints(1);

212

config.setMinPauseBetweenCheckpoints(2000);

213

```

214

215

### Error Handling and Recovery

216

217

Handle failures gracefully with write-ahead logging:

218

219

```java

220

// Configure restart strategy

221

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

222

3, // number of restart attempts

223

org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delay

224

));

225

226

// Custom failure handling

227

CassandraFailureHandler walFailureHandler = new CassandraFailureHandler() {

228

@Override

229

public void onFailure(Throwable failure) throws IOException {

230

// For WAL sinks, be more conservative with error handling

231

// since failures affect exactly-once guarantees

232

logger.error("WAL sink failure - will cause job restart", failure);

233

throw new IOException("WAL operation failed", failure);

234

}

235

};

236

237

CassandraSink.addSink(stream)

238

.setQuery("INSERT INTO example.critical_data (id, value) VALUES (?, ?);")

239

.setClusterBuilder(builder)

240

.setFailureHandler(walFailureHandler)

241

.enableWriteAheadLog()

242

.build();

243

```

244

245

### Monitoring and Metrics

246

247

Monitor write-ahead log performance:

248

249

```java

250

// Add custom metrics to track WAL performance

251

public class MonitoredCassandraCommitter extends CassandraCommitter {

252

private Counter checkpointCommits;

253

private Counter checkpointFailures;

254

255

public MonitoredCassandraCommitter(ClusterBuilder builder) {

256

super(builder);

257

}

258

259

@Override

260

public void open() throws Exception {

261

super.open();

262

263

MetricGroup metricGroup = getRuntimeContext().getMetricGroup();

264

checkpointCommits = metricGroup.counter("checkpoint_commits");

265

checkpointFailures = metricGroup.counter("checkpoint_failures");

266

}

267

268

@Override

269

public void commitCheckpoint(int subtaskIdx, long checkpointId) throws Exception {

270

try {

271

super.commitCheckpoint(subtaskIdx, checkpointId);

272

checkpointCommits.inc();

273

} catch (Exception e) {

274

checkpointFailures.inc();

275

throw e;

276

}

277

}

278

}

279

```

280

281

### State Backend Configuration

282

283

Choose appropriate state backend for WAL:

284

285

```java

286

// RocksDB state backend for large state (recommended for WAL)

287

env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/path/to/checkpoints"));

288

289

// Memory state backend for small state (development only)

290

env.setStateBackend(new MemoryStateBackend());

291

292

// FileSystem state backend for medium state

293

env.setStateBackend(new FsStateBackend("hdfs://namenode:port/path/to/checkpoints"));

294

```

295

296

## Limitations and Considerations

297

298

### POJO Limitations

299

300

Write-ahead logging is only supported for Tuple and Row types:

301

302

```java

303

// Supported

304

DataStream<Tuple2<String, Integer>> tuples = // ...

305

CassandraSink.addSink(tuples)

306

.setQuery("INSERT INTO example.data (key, value) VALUES (?, ?);")

307

.setClusterBuilder(builder)

308

.enableWriteAheadLog() // ✓ Supported

309

.build();

310

311

DataStream<Row> rows = // ...

312

CassandraSink.addSink(rows)

313

.setQuery("INSERT INTO example.data (key, value) VALUES (?, ?);")

314

.setClusterBuilder(builder)

315

.enableWriteAheadLog() // ✓ Supported

316

.build();

317

318

// NOT supported

319

DataStream<MyPojo> pojos = // ...

320

CassandraSink.addSink(pojos)

321

.setDefaultKeyspace("example")

322

.setClusterBuilder(builder)

323

.enableWriteAheadLog() // ✗ Will throw IllegalArgumentException

324

.build();

325

```

326

327

### Performance Impact

328

329

Write-ahead logging introduces additional overhead:

330

331

- **Latency**: Records are buffered until checkpoint completion

332

- **Memory**: Records are stored in Flink's state backend

333

- **Throughput**: Batch writes occur only at checkpoint intervals

334

- **Storage**: Checkpoint metadata is stored in Cassandra

335

336

### Cassandra Requirements

337

338

Write-ahead logging creates additional tables in Cassandra:

339

340

```sql

341

-- Default checkpoint metadata table

342

CREATE KEYSPACE IF NOT EXISTS checkpoints_sink

343

WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

344

345

CREATE TABLE IF NOT EXISTS checkpoints_sink.checkpoints_<job_id> (

346

sink_id int,

347

checkpoint_id bigint,

348

PRIMARY KEY (sink_id, checkpoint_id)

349

);

350

```

351

352

Ensure your Cassandra cluster has sufficient resources for both data and checkpoint metadata.