0
# Fault Tolerance & Write-Ahead Logging
1
2
The Flink Cassandra Connector provides exactly-once processing guarantees through checkpoint coordination and write-ahead logging for streaming applications. This ensures data consistency and prevents data loss even in the presence of failures.
3
4
## Capabilities
5
6
### CassandraCommitter
7
8
CheckpointCommitter implementation that stores checkpoint information in a dedicated Cassandra table for coordination between Flink checkpoints and Cassandra writes.
9
10
```java { .api }
11
/**
12
* CheckpointCommitter that saves completed checkpoint information in Cassandra
13
* Creates entries in format: |operator_id | subtask_id | last_completed_checkpoint|
14
*/
15
public class CassandraCommitter extends CheckpointCommitter {
16
/**
17
* Creates committer with default keyspace 'flink_auxiliary'
18
* @param builder ClusterBuilder for Cassandra connection configuration
19
*/
20
public CassandraCommitter(ClusterBuilder builder);
21
22
/**
23
* Creates committer with custom keyspace for checkpoint storage
24
* @param builder ClusterBuilder for Cassandra connection configuration
25
* @param keySpace custom keyspace name for checkpoint table storage
26
*/
27
public CassandraCommitter(ClusterBuilder builder, String keySpace);
28
29
/**
30
* Sets job ID for checkpoint table naming (called internally by Flink)
31
* @param id unique job identifier
32
* @throws Exception if setup fails
33
*/
34
public void setJobId(String id) throws Exception;
35
36
/**
37
* Creates necessary keyspace and checkpoint table if they don't exist
38
* @throws Exception if table/keyspace creation fails
39
*/
40
@Override
41
public void createResource() throws Exception;
42
43
/**
44
* Opens connection for checkpoint operations
45
* @throws Exception if connection setup fails
46
*/
47
@Override
48
public void open() throws Exception;
49
50
/**
51
* Closes connections and clears checkpoint cache
52
* @throws Exception if cleanup fails
53
*/
54
@Override
55
public void close() throws Exception;
56
57
/**
58
* Records checkpoint completion in Cassandra
59
* @param subtaskIdx subtask index that completed checkpoint
60
* @param checkpointId checkpoint ID that was completed
61
*/
62
@Override
63
public void commitCheckpoint(int subtaskIdx, long checkpointId);
64
65
/**
66
* Checks if a specific checkpoint has been committed
67
* Uses local cache to minimize Cassandra queries
68
* @param subtaskIdx subtask index to check
69
* @param checkpointId checkpoint ID to verify
70
* @return true if checkpoint has been committed
71
*/
72
@Override
73
public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
74
}
75
```
76
77
### Write-Ahead Logging Configuration
78
79
Enable exactly-once processing by configuring write-ahead logging in sink builders.
80
81
**Basic WAL Configuration:**
82
83
```java
84
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
85
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
86
87
// Enable WAL with default committer
88
CassandraSink<Tuple3<String, Integer, String>> walSink = CassandraSink
89
.addSink(stream)
90
.setQuery("INSERT INTO users (id, age, name) VALUES (?, ?, ?)")
91
.setHost("localhost")
92
.enableWriteAheadLog() // Uses CassandraCommitter with default keyspace
93
.build();
94
```
95
96
**Custom WAL Configuration:**
97
98
```java
99
// Create custom committer with specific keyspace
100
ClusterBuilder committerClusterBuilder = new ClusterBuilder() {
101
@Override
102
protected Cluster buildCluster(Cluster.Builder builder) {
103
return builder
104
.addContactPoint("cassandra-checkpoint.example.com")
105
.withPort(9042)
106
.withCredentials("checkpoint_user", "checkpoint_password")
107
.build();
108
}
109
};
110
111
CassandraCommitter customCommitter = new CassandraCommitter(
112
committerClusterBuilder,
113
"custom_checkpoint_keyspace"
114
);
115
116
// Enable WAL with custom committer
117
CassandraSink<Tuple4<String, Long, Double, Boolean>> customWalSink = CassandraSink
118
.addSink(complexStream)
119
.setQuery("INSERT INTO transactions (tx_id, timestamp, amount, processed) VALUES (?, ?, ?, ?)")
120
.setClusterBuilder(dataClusterBuilder)
121
.enableWriteAheadLog(customCommitter)
122
.build();
123
```
124
125
### CassandraTupleWriteAheadSink Implementation
126
127
The underlying write-ahead logging sink implementation that provides exactly-once guarantees.
128
129
```java { .api }
130
/**
131
* Write-ahead logging sink that stores incoming records in Flink's state backend
132
* and commits them to Cassandra only when checkpoint completes successfully
133
*/
134
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
135
/**
136
* Creates WAL sink with checkpoint coordination
137
* @param insertQuery CQL INSERT statement with parameter placeholders
138
* @param serializer TypeSerializer for storing records in state backend
139
* @param builder ClusterBuilder for Cassandra connection
140
* @param committer CheckpointCommitter for tracking checkpoint completion
141
* @throws Exception if initialization fails
142
*/
143
protected CassandraTupleWriteAheadSink(
144
String insertQuery,
145
TypeSerializer<IN> serializer,
146
ClusterBuilder builder,
147
CheckpointCommitter committer
148
) throws Exception;
149
150
/**
151
* Opens connections and validates checkpointing is enabled
152
* @throws Exception if checkpointing is disabled or connection fails
153
*/
154
public void open() throws Exception;
155
156
/**
157
* Closes connections and cleans up resources
158
* @throws Exception if cleanup fails
159
*/
160
@Override
161
public void close() throws Exception;
162
163
/**
164
* Sends batch of values to Cassandra with checkpoint coordination
165
* @param values batch of tuples to write
166
* @param checkpointId checkpoint ID for coordination
167
* @param timestamp checkpoint timestamp
168
* @return true if all writes successful, false if any failed
169
* @throws Exception if batch processing fails
170
*/
171
@Override
172
protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;
173
}
174
```
175
176
## Fault Tolerance Patterns
177
178
### Exactly-Once Processing Example
179
180
Complete example showing exactly-once processing setup with proper error handling.
181
182
```java
183
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
184
import org.apache.flink.streaming.api.CheckpointingMode;
185
import org.apache.flink.streaming.api.datastream.DataStream;
186
import org.apache.flink.api.java.tuple.Tuple3;
187
188
// Configure execution environment for exactly-once processing
189
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
190
191
// Enable checkpointing with exactly-once mode
192
env.enableCheckpointing(10000); // checkpoint every 10 seconds
193
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
194
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
195
env.getCheckpointConfig().setCheckpointTimeout(60000);
196
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
197
198
// Configure state backend for durability
199
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints"));
200
201
// Data source
202
DataStream<Tuple3<String, Long, String>> criticalEvents = env
203
.addSource(new CriticalEventSource())
204
.name("Critical Events Source");
205
206
// Create WAL-enabled sink for exactly-once guarantees
207
ClusterBuilder sinkClusterBuilder = new ClusterBuilder() {
208
@Override
209
protected Cluster buildCluster(Cluster.Builder builder) {
210
return builder
211
.addContactPoint("cassandra-primary.example.com")
212
.addContactPoint("cassandra-secondary.example.com")
213
.withPort(9042)
214
.withCredentials("sink_user", "sink_password")
215
.withRetryPolicy(new ExponentialReconnectionPolicy(1000, 10000))
216
.build();
217
}
218
};
219
220
ClusterBuilder checkpointClusterBuilder = new ClusterBuilder() {
221
@Override
222
protected Cluster buildCluster(Cluster.Builder builder) {
223
return builder
224
.addContactPoint("cassandra-checkpoint.example.com")
225
.withPort(9042)
226
.withCredentials("checkpoint_user", "checkpoint_password")
227
.build();
228
}
229
};
230
231
CassandraCommitter committer = new CassandraCommitter(
232
checkpointClusterBuilder,
233
"flink_checkpoints"
234
);
235
236
CassandraSink<Tuple3<String, Long, String>> exactlyOnceSink = CassandraSink
237
.addSink(criticalEvents)
238
.setQuery("INSERT INTO critical_events (event_id, timestamp, data) VALUES (?, ?, ?)")
239
.setClusterBuilder(sinkClusterBuilder)
240
.enableWriteAheadLog(committer)
241
.build();
242
243
exactlyOnceSink
244
.name("Critical Events Cassandra Sink")
245
.uid("critical-events-sink") // Important for savepoint compatibility
246
.setParallelism(4);
247
248
env.execute("Critical Events Processing");
249
```
250
251
### Recovery and Restart Behavior
252
253
Understanding how the connector behaves during failures and recovery.
254
255
```java
256
// Recovery behavior configuration
257
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
258
3, // number of restart attempts
259
Time.of(10, TimeUnit.SECONDS) // delay between restarts
260
));
261
262
// On failure and restart:
263
// 1. Flink restores from last successful checkpoint
264
// 2. CassandraCommitter checks which records were already committed
265
// 3. WAL sink replays only uncommitted records
266
// 4. Processing continues from the checkpoint point
267
268
// Custom restart strategy for production
269
env.setRestartStrategy(RestartStrategies.failureRateRestart(
270
5, // max failures per interval
271
Time.of(5, TimeUnit.MINUTES), // time interval
272
Time.of(30, TimeUnit.SECONDS) // delay between restarts
273
));
274
```
275
276
### Monitoring Fault Tolerance
277
278
Implement monitoring for checkpoint and recovery metrics.
279
280
```java
281
// Custom metrics for monitoring WAL sink performance
282
public class MonitoredCassandraCommitter extends CassandraCommitter {
283
private final Counter checkpointCommits;
284
private final Histogram checkpointLatency;
285
286
public MonitoredCassandraCommitter(ClusterBuilder builder, MetricGroup metricGroup) {
287
super(builder);
288
this.checkpointCommits = metricGroup.counter("checkpoint_commits");
289
this.checkpointLatency = metricGroup.histogram("checkpoint_latency");
290
}
291
292
@Override
293
public void commitCheckpoint(int subtaskIdx, long checkpointId) {
294
long startTime = System.currentTimeMillis();
295
super.commitCheckpoint(subtaskIdx, checkpointId);
296
long latency = System.currentTimeMillis() - startTime;
297
298
checkpointCommits.inc();
299
checkpointLatency.update(latency);
300
}
301
}
302
```
303
304
## Checkpoint Storage Configuration
305
306
### Default Checkpoint Table Schema
307
308
The CassandraCommitter creates the following table structure:
309
310
```sql
311
-- Default keyspace: flink_auxiliary
312
-- Default table name: checkpoints_{job_id}
313
CREATE TABLE IF NOT EXISTS flink_auxiliary.checkpoints_job123 (
314
sink_id text,
315
sub_id int,
316
checkpoint_id bigint,
317
PRIMARY KEY (sink_id, sub_id)
318
);
319
```
320
321
### Custom Checkpoint Storage
322
323
Configure custom keyspace and table settings for checkpoint storage.
324
325
```java
326
// Custom keyspace configuration
327
CassandraCommitter committer = new CassandraCommitter(builder, "production_checkpoints");
328
329
// The committer will create:
330
// - Keyspace: production_checkpoints
331
// - Table: checkpoints_{job_id}
332
// - Replication: SimpleStrategy with replication_factor=1 (default)
333
334
// For production, consider creating the keyspace manually with appropriate replication:
335
/*
336
CREATE KEYSPACE production_checkpoints
337
WITH replication = {
338
'class': 'NetworkTopologyStrategy',
339
'datacenter1': 3,
340
'datacenter2': 2
341
};
342
*/
343
```
344
345
## Best Practices
346
347
### Checkpointing Configuration
348
349
- **Checkpoint interval**: 10-60 seconds depending on throughput and latency requirements
350
- **Exactly-once mode**: Always use `CheckpointingMode.EXACTLY_ONCE` for critical data
351
- **Concurrent checkpoints**: Set to 1 to avoid resource contention
352
- **Checkpoint timeout**: Set based on cluster size and network latency
353
354
### Resource Management
355
356
- **Separate clusters**: Consider using separate Cassandra clusters for data and checkpoints
357
- **Keyspace isolation**: Use dedicated keyspace for checkpoint tables
358
- **Connection pooling**: Configure appropriate connection pools for both data and checkpoint connections
359
360
### Error Handling
361
362
- **Restart strategies**: Configure appropriate restart strategies for your use case
363
- **Alerting**: Monitor checkpoint failure rates and set up alerts
364
- **Manual intervention**: Plan procedures for manual checkpoint recovery if needed
365
366
### Performance Optimization
367
368
- **Batch processing**: WAL sink processes records in batches during checkpoints
369
- **Parallelism**: Configure sink parallelism based on Cassandra cluster capacity
370
- **Network optimization**: Co-locate Flink and Cassandra for reduced latency