Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases
—
Exactly-once processing guarantees through write-ahead logging with checkpoint integration. Stores records in Flink's state backend and commits them to Cassandra only on successful checkpoint completion, ensuring data consistency even in the presence of failures.
Write-ahead logging provides exactly-once processing semantics by:
This ensures that each record is written to Cassandra exactly once, even if the job fails and restarts.
Write-ahead log implementation for Flink Tuple types.
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
public CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer);
public void open();
public void close();
protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp);
}Usage Example:
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
// Enable checkpointing in the environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
// Create cluster builder
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
};
// Create write-ahead sink using the builder pattern
DataStream<Tuple3<String, Integer, Long>> stream = // ... your data stream
CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.events (id, count, timestamp) VALUES (?, ?, ?);")
.setClusterBuilder(builder)
.enableWriteAheadLog() // This creates the write-ahead sink internally
.build();Manual Construction:
// Manual construction (not typically needed)
TypeSerializer<Tuple3<String, Integer, Long>> serializer =
stream.getType().createSerializer(env.getConfig());
CassandraCommitter committer = new CassandraCommitter(builder);
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Long>> walSink =
new CassandraTupleWriteAheadSink<>(
"INSERT INTO example.events (id, count, timestamp) VALUES (?, ?, ?);",
serializer,
builder,
committer
);
stream.transform("Cassandra WAL Sink", null, walSink);Write-ahead log implementation for Flink Row types.
public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> {
public CassandraRowWriteAheadSink(String insertQuery, TypeSerializer<Row> serializer, ClusterBuilder builder, CheckpointCommitter committer);
public void open();
public void close();
protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp);
}Usage Example:
import org.apache.flink.streaming.connectors.cassandra.CassandraRowWriteAheadSink;
import org.apache.flink.types.Row;
// Enable checkpointing
env.enableCheckpointing(10000); // Checkpoint every 10 seconds
DataStream<Row> rowStream = // ... your row stream
CassandraSink.addSink(rowStream)
.setQuery("INSERT INTO example.metrics (id, value, timestamp) VALUES (?, ?, ?);")
.setClusterBuilder(builder)
.enableWriteAheadLog()
.build();Manages checkpoint information storage and retrieval for exactly-once processing.
public class CassandraCommitter extends CheckpointCommitter {
public CassandraCommitter(ClusterBuilder builder);
public CassandraCommitter(ClusterBuilder builder, String keySpace);
public void setJobId(String id);
public void createResource();
public void open();
public void close();
public void commitCheckpoint(int subtaskIdx, long checkpointId);
public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
}Default Behavior:
// Default committer (uses default keyspace and table names)
CassandraCommitter defaultCommitter = new CassandraCommitter(builder);Custom Configuration:
// Custom keyspace for checkpoint metadata
CassandraCommitter customCommitter = new CassandraCommitter(builder, "checkpoint_ks");
customCommitter.setJobId("my-flink-job-v1");
// The committer will create the following schema:
// KEYSPACE checkpoint_ks
// TABLE checkpoint_ks.checkpoints_my_flink_job_v1 (
// sink_id int,
// checkpoint_id bigint,
// PRIMARY KEY (sink_id, checkpoint_id)
// )Manual Setup:
// Initialize the checkpoint storage manually
CassandraCommitter committer = new CassandraCommitter(builder, "checkpoints");
committer.setJobId("data-pipeline");
committer.createResource(); // Creates keyspace and table
committer.open();
// Use with write-ahead sink
CassandraTupleWriteAheadSink<Tuple2<String, Integer>> walSink =
new CassandraTupleWriteAheadSink<>(
"INSERT INTO example.data (key, value) VALUES (?, ?);",
serializer,
builder,
committer
);Proper checkpoint configuration is critical for write-ahead logging:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing with appropriate interval
env.enableCheckpointing(5000); // 5 second intervals
// Configure checkpoint behavior
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(1000); // 1 second pause between checkpoints
config.setMaxConcurrentCheckpoints(1); // Only one concurrent checkpoint
config.setCheckpointTimeout(300000); // 5 minute timeout
config.setFailOnCheckpointingErrors(true); // Fail job on checkpoint errors
// Enable external checkpoints for recovery
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Write-ahead logging trades throughput for consistency:
// High-throughput configuration
env.enableCheckpointing(30000); // Longer checkpoint intervals
config.setMaxConcurrentCheckpoints(1);
config.setMinPauseBetweenCheckpoints(5000);
// Low-latency configuration
env.enableCheckpointing(1000); // Frequent checkpoints
config.setMaxConcurrentCheckpoints(1);
config.setMinPauseBetweenCheckpoints(500);
// Balanced configuration
env.enableCheckpointing(10000); // 10 second intervals
config.setMaxConcurrentCheckpoints(1);
config.setMinPauseBetweenCheckpoints(2000);Handle failures gracefully with write-ahead logging:
// Configure restart strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delay
));
// Custom failure handling
CassandraFailureHandler walFailureHandler = new CassandraFailureHandler() {
@Override
public void onFailure(Throwable failure) throws IOException {
// For WAL sinks, be more conservative with error handling
// since failures affect exactly-once guarantees
logger.error("WAL sink failure - will cause job restart", failure);
throw new IOException("WAL operation failed", failure);
}
};
CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.critical_data (id, value) VALUES (?, ?);")
.setClusterBuilder(builder)
.setFailureHandler(walFailureHandler)
.enableWriteAheadLog()
.build();Monitor write-ahead log performance:
// Add custom metrics to track WAL performance
public class MonitoredCassandraCommitter extends CassandraCommitter {
private Counter checkpointCommits;
private Counter checkpointFailures;
public MonitoredCassandraCommitter(ClusterBuilder builder) {
super(builder);
}
@Override
public void open() throws Exception {
super.open();
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
checkpointCommits = metricGroup.counter("checkpoint_commits");
checkpointFailures = metricGroup.counter("checkpoint_failures");
}
@Override
public void commitCheckpoint(int subtaskIdx, long checkpointId) throws Exception {
try {
super.commitCheckpoint(subtaskIdx, checkpointId);
checkpointCommits.inc();
} catch (Exception e) {
checkpointFailures.inc();
throw e;
}
}
}Choose appropriate state backend for WAL:
// RocksDB state backend for large state (recommended for WAL)
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/path/to/checkpoints"));
// Memory state backend for small state (development only)
env.setStateBackend(new MemoryStateBackend());
// FileSystem state backend for medium state
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/path/to/checkpoints"));Write-ahead logging is only supported for Tuple and Row types:
// Supported
DataStream<Tuple2<String, Integer>> tuples = // ...
CassandraSink.addSink(tuples)
.setQuery("INSERT INTO example.data (key, value) VALUES (?, ?);")
.setClusterBuilder(builder)
.enableWriteAheadLog() // ✓ Supported
.build();
DataStream<Row> rows = // ...
CassandraSink.addSink(rows)
.setQuery("INSERT INTO example.data (key, value) VALUES (?, ?);")
.setClusterBuilder(builder)
.enableWriteAheadLog() // ✓ Supported
.build();
// NOT supported
DataStream<MyPojo> pojos = // ...
CassandraSink.addSink(pojos)
.setDefaultKeyspace("example")
.setClusterBuilder(builder)
.enableWriteAheadLog() // ✗ Will throw IllegalArgumentException
.build();Write-ahead logging introduces additional overhead:
Write-ahead logging creates additional tables in Cassandra:
-- Default checkpoint metadata table
CREATE KEYSPACE IF NOT EXISTS checkpoints_sink
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE TABLE IF NOT EXISTS checkpoints_sink.checkpoints_<job_id> (
sink_id int,
checkpoint_id bigint,
PRIMARY KEY (sink_id, checkpoint_id)
);Ensure your Cassandra cluster has sufficient resources for both data and checkpoint metadata.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra