0
# Write-Ahead Logging
1
2
Exactly-once processing guarantees through write-ahead logging with checkpoint integration. Stores records in Flink's state backend and commits them to Cassandra only on successful checkpoint completion, ensuring data consistency even in the presence of failures.
3
4
## Capabilities
5
6
### Write-Ahead Log Concept
7
8
Write-ahead logging provides exactly-once processing semantics by:
9
10
1. **Buffering**: Incoming records are stored in Flink's state backend instead of being written directly to Cassandra
11
2. **Checkpointing**: Records are held until a Flink checkpoint successfully completes
12
3. **Batch Commit**: On checkpoint completion, buffered records are written to Cassandra in batches
13
4. **Failure Recovery**: If a failure occurs before checkpoint completion, buffered records are discarded and reprocessed
14
15
This ensures that each record is written to Cassandra exactly once, even if the job fails and restarts.
16
17
### Tuple Write-Ahead Sink
18
19
Write-ahead log implementation for Flink Tuple types.
20
21
```java { .api }
22
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
23
public CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer);
24
public void open();
25
public void close();
26
protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp);
27
}
28
```
29
30
**Usage Example:**
31
32
```java
33
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;
34
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
35
import org.apache.flink.api.java.tuple.Tuple3;
36
import org.apache.flink.streaming.api.datastream.DataStream;
37
38
// Enable checkpointing in the environment
39
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
40
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
41
42
// Create cluster builder
43
ClusterBuilder builder = new ClusterBuilder() {
44
@Override
45
protected Cluster buildCluster(Cluster.Builder builder) {
46
return builder.addContactPoint("127.0.0.1").build();
47
}
48
};
49
50
// Create write-ahead sink using the builder pattern
51
DataStream<Tuple3<String, Integer, Long>> stream = // ... your data stream
52
53
CassandraSink.addSink(stream)
54
.setQuery("INSERT INTO example.events (id, count, timestamp) VALUES (?, ?, ?);")
55
.setClusterBuilder(builder)
56
.enableWriteAheadLog() // This creates the write-ahead sink internally
57
.build();
58
```
59
60
**Manual Construction:**
61
62
```java
63
// Manual construction (not typically needed)
64
TypeSerializer<Tuple3<String, Integer, Long>> serializer =
65
stream.getType().createSerializer(env.getConfig());
66
67
CassandraCommitter committer = new CassandraCommitter(builder);
68
69
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Long>> walSink =
70
new CassandraTupleWriteAheadSink<>(
71
"INSERT INTO example.events (id, count, timestamp) VALUES (?, ?, ?);",
72
serializer,
73
builder,
74
committer
75
);
76
77
stream.transform("Cassandra WAL Sink", null, walSink);
78
```
79
80
### Row Write-Ahead Sink
81
82
Write-ahead log implementation for Flink Row types.
83
84
```java { .api }
85
public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> {
86
public CassandraRowWriteAheadSink(String insertQuery, TypeSerializer<Row> serializer, ClusterBuilder builder, CheckpointCommitter committer);
87
public void open();
88
public void close();
89
protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp);
90
}
91
```
92
93
**Usage Example:**
94
95
```java
96
import org.apache.flink.streaming.connectors.cassandra.CassandraRowWriteAheadSink;
97
import org.apache.flink.types.Row;
98
99
// Enable checkpointing
100
env.enableCheckpointing(10000); // Checkpoint every 10 seconds
101
102
DataStream<Row> rowStream = // ... your row stream
103
104
CassandraSink.addSink(rowStream)
105
.setQuery("INSERT INTO example.metrics (id, value, timestamp) VALUES (?, ?, ?);")
106
.setClusterBuilder(builder)
107
.enableWriteAheadLog()
108
.build();
109
```
110
111
### Checkpoint Committer
112
113
Manages checkpoint information storage and retrieval for exactly-once processing.
114
115
```java { .api }
116
public class CassandraCommitter extends CheckpointCommitter {
117
public CassandraCommitter(ClusterBuilder builder);
118
public CassandraCommitter(ClusterBuilder builder, String keySpace);
119
public void setJobId(String id);
120
public void createResource();
121
public void open();
122
public void close();
123
public void commitCheckpoint(int subtaskIdx, long checkpointId);
124
public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
125
}
126
```
127
128
**Default Behavior:**
129
130
```java
131
// Default committer (uses default keyspace and table names)
132
CassandraCommitter defaultCommitter = new CassandraCommitter(builder);
133
```
134
135
**Custom Configuration:**
136
137
```java
138
// Custom keyspace for checkpoint metadata
139
CassandraCommitter customCommitter = new CassandraCommitter(builder, "checkpoint_ks");
140
customCommitter.setJobId("my-flink-job-v1");
141
142
// The committer will create the following schema:
143
// KEYSPACE checkpoint_ks
144
// TABLE checkpoint_ks.checkpoints_my_flink_job_v1 (
145
// sink_id int,
146
// checkpoint_id bigint,
147
// PRIMARY KEY (sink_id, checkpoint_id)
148
// )
149
```
150
151
**Manual Setup:**
152
153
```java
154
// Initialize the checkpoint storage manually
155
CassandraCommitter committer = new CassandraCommitter(builder, "checkpoints");
156
committer.setJobId("data-pipeline");
157
committer.createResource(); // Creates keyspace and table
158
committer.open();
159
160
// Use with write-ahead sink
161
CassandraTupleWriteAheadSink<Tuple2<String, Integer>> walSink =
162
new CassandraTupleWriteAheadSink<>(
163
"INSERT INTO example.data (key, value) VALUES (?, ?);",
164
serializer,
165
builder,
166
committer
167
);
168
```
169
170
## Advanced Usage Patterns
171
172
### Checkpoint Configuration
173
174
Proper checkpoint configuration is critical for write-ahead logging:
175
176
```java
177
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
178
179
// Enable checkpointing with appropriate interval
180
env.enableCheckpointing(5000); // 5 second intervals
181
182
// Configure checkpoint behavior
183
CheckpointConfig config = env.getCheckpointConfig();
184
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
185
config.setMinPauseBetweenCheckpoints(1000); // 1 second pause between checkpoints
186
config.setMaxConcurrentCheckpoints(1); // Only one concurrent checkpoint
187
config.setCheckpointTimeout(300000); // 5 minute timeout
188
config.setFailOnCheckpointingErrors(true); // Fail job on checkpoint errors
189
190
// Enable external checkpoints for recovery
191
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
192
```
193
194
### Performance Considerations
195
196
Write-ahead logging trades throughput for consistency:
197
198
```java
199
// High-throughput configuration
200
env.enableCheckpointing(30000); // Longer checkpoint intervals
201
config.setMaxConcurrentCheckpoints(1);
202
config.setMinPauseBetweenCheckpoints(5000);
203
204
// Low-latency configuration
205
env.enableCheckpointing(1000); // Frequent checkpoints
206
config.setMaxConcurrentCheckpoints(1);
207
config.setMinPauseBetweenCheckpoints(500);
208
209
// Balanced configuration
210
env.enableCheckpointing(10000); // 10 second intervals
211
config.setMaxConcurrentCheckpoints(1);
212
config.setMinPauseBetweenCheckpoints(2000);
213
```
214
215
### Error Handling and Recovery
216
217
Handle failures gracefully with write-ahead logging:
218
219
```java
220
// Configure restart strategy
221
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
222
3, // number of restart attempts
223
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delay
224
));
225
226
// Custom failure handling
227
CassandraFailureHandler walFailureHandler = new CassandraFailureHandler() {
228
@Override
229
public void onFailure(Throwable failure) throws IOException {
230
// For WAL sinks, be more conservative with error handling
231
// since failures affect exactly-once guarantees
232
logger.error("WAL sink failure - will cause job restart", failure);
233
throw new IOException("WAL operation failed", failure);
234
}
235
};
236
237
CassandraSink.addSink(stream)
238
.setQuery("INSERT INTO example.critical_data (id, value) VALUES (?, ?);")
239
.setClusterBuilder(builder)
240
.setFailureHandler(walFailureHandler)
241
.enableWriteAheadLog()
242
.build();
243
```
244
245
### Monitoring and Metrics
246
247
Monitor write-ahead log performance:
248
249
```java
250
// Add custom metrics to track WAL performance
251
public class MonitoredCassandraCommitter extends CassandraCommitter {
252
private Counter checkpointCommits;
253
private Counter checkpointFailures;
254
255
public MonitoredCassandraCommitter(ClusterBuilder builder) {
256
super(builder);
257
}
258
259
@Override
260
public void open() throws Exception {
261
super.open();
262
263
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
264
checkpointCommits = metricGroup.counter("checkpoint_commits");
265
checkpointFailures = metricGroup.counter("checkpoint_failures");
266
}
267
268
@Override
269
public void commitCheckpoint(int subtaskIdx, long checkpointId) throws Exception {
270
try {
271
super.commitCheckpoint(subtaskIdx, checkpointId);
272
checkpointCommits.inc();
273
} catch (Exception e) {
274
checkpointFailures.inc();
275
throw e;
276
}
277
}
278
}
279
```
280
281
### State Backend Configuration
282
283
Choose appropriate state backend for WAL:
284
285
```java
286
// RocksDB state backend for large state (recommended for WAL)
287
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/path/to/checkpoints"));
288
289
// Memory state backend for small state (development only)
290
env.setStateBackend(new MemoryStateBackend());
291
292
// FileSystem state backend for medium state
293
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/path/to/checkpoints"));
294
```
295
296
## Limitations and Considerations
297
298
### POJO Limitations
299
300
Write-ahead logging is only supported for Tuple and Row types:
301
302
```java
303
// Supported
304
DataStream<Tuple2<String, Integer>> tuples = // ...
305
CassandraSink.addSink(tuples)
306
.setQuery("INSERT INTO example.data (key, value) VALUES (?, ?);")
307
.setClusterBuilder(builder)
308
.enableWriteAheadLog() // ✓ Supported
309
.build();
310
311
DataStream<Row> rows = // ...
312
CassandraSink.addSink(rows)
313
.setQuery("INSERT INTO example.data (key, value) VALUES (?, ?);")
314
.setClusterBuilder(builder)
315
.enableWriteAheadLog() // ✓ Supported
316
.build();
317
318
// NOT supported
319
DataStream<MyPojo> pojos = // ...
320
CassandraSink.addSink(pojos)
321
.setDefaultKeyspace("example")
322
.setClusterBuilder(builder)
323
.enableWriteAheadLog() // ✗ Will throw IllegalArgumentException
324
.build();
325
```
326
327
### Performance Impact
328
329
Write-ahead logging introduces additional overhead:
330
331
- **Latency**: Records are buffered until checkpoint completion
332
- **Memory**: Records are stored in Flink's state backend
333
- **Throughput**: Batch writes occur only at checkpoint intervals
334
- **Storage**: Checkpoint metadata is stored in Cassandra
335
336
### Cassandra Requirements
337
338
Write-ahead logging creates additional tables in Cassandra:
339
340
```sql
341
-- Default checkpoint metadata table
342
CREATE KEYSPACE IF NOT EXISTS checkpoints_sink
343
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
344
345
CREATE TABLE IF NOT EXISTS checkpoints_sink.checkpoints_<job_id> (
346
sink_id int,
347
checkpoint_id bigint,
348
PRIMARY KEY (sink_id, checkpoint_id)
349
);
350
```
351
352
Ensure your Cassandra cluster has sufficient resources for both data and checkpoint metadata.