Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities
—
The Flink Cassandra Connector provides exactly-once processing guarantees through checkpoint coordination and write-ahead logging for streaming applications. This ensures data consistency and prevents data loss even in the presence of failures.
CheckpointCommitter implementation that stores checkpoint information in a dedicated Cassandra table for coordination between Flink checkpoints and Cassandra writes.
/**
* CheckpointCommitter that saves completed checkpoint information in Cassandra
* Creates entries in format: |operator_id | subtask_id | last_completed_checkpoint|
*/
public class CassandraCommitter extends CheckpointCommitter {
/**
* Creates committer with default keyspace 'flink_auxiliary'
* @param builder ClusterBuilder for Cassandra connection configuration
*/
public CassandraCommitter(ClusterBuilder builder);
/**
* Creates committer with custom keyspace for checkpoint storage
* @param builder ClusterBuilder for Cassandra connection configuration
* @param keySpace custom keyspace name for checkpoint table storage
*/
public CassandraCommitter(ClusterBuilder builder, String keySpace);
/**
* Sets job ID for checkpoint table naming (called internally by Flink)
* @param id unique job identifier
* @throws Exception if setup fails
*/
public void setJobId(String id) throws Exception;
/**
* Creates necessary keyspace and checkpoint table if they don't exist
* @throws Exception if table/keyspace creation fails
*/
@Override
public void createResource() throws Exception;
/**
* Opens connection for checkpoint operations
* @throws Exception if connection setup fails
*/
@Override
public void open() throws Exception;
/**
* Closes connections and clears checkpoint cache
* @throws Exception if cleanup fails
*/
@Override
public void close() throws Exception;
/**
* Records checkpoint completion in Cassandra
* @param subtaskIdx subtask index that completed checkpoint
* @param checkpointId checkpoint ID that was completed
*/
@Override
public void commitCheckpoint(int subtaskIdx, long checkpointId);
/**
* Checks if a specific checkpoint has been committed
* Uses local cache to minimize Cassandra queries
* @param subtaskIdx subtask index to check
* @param checkpointId checkpoint ID to verify
* @return true if checkpoint has been committed
*/
@Override
public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
}Enable exactly-once processing by configuring write-ahead logging in sink builders.
Basic WAL Configuration:
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
// Enable WAL with default committer
CassandraSink<Tuple3<String, Integer, String>> walSink = CassandraSink
.addSink(stream)
.setQuery("INSERT INTO users (id, age, name) VALUES (?, ?, ?)")
.setHost("localhost")
.enableWriteAheadLog() // Uses CassandraCommitter with default keyspace
.build();Custom WAL Configuration:
// Create custom committer with specific keyspace
ClusterBuilder committerClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra-checkpoint.example.com")
.withPort(9042)
.withCredentials("checkpoint_user", "checkpoint_password")
.build();
}
};
CassandraCommitter customCommitter = new CassandraCommitter(
committerClusterBuilder,
"custom_checkpoint_keyspace"
);
// Enable WAL with custom committer
CassandraSink<Tuple4<String, Long, Double, Boolean>> customWalSink = CassandraSink
.addSink(complexStream)
.setQuery("INSERT INTO transactions (tx_id, timestamp, amount, processed) VALUES (?, ?, ?, ?)")
.setClusterBuilder(dataClusterBuilder)
.enableWriteAheadLog(customCommitter)
.build();The underlying write-ahead logging sink implementation that provides exactly-once guarantees.
/**
* Write-ahead logging sink that stores incoming records in Flink's state backend
* and commits them to Cassandra only when checkpoint completes successfully
*/
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
/**
* Creates WAL sink with checkpoint coordination
* @param insertQuery CQL INSERT statement with parameter placeholders
* @param serializer TypeSerializer for storing records in state backend
* @param builder ClusterBuilder for Cassandra connection
* @param committer CheckpointCommitter for tracking checkpoint completion
* @throws Exception if initialization fails
*/
protected CassandraTupleWriteAheadSink(
String insertQuery,
TypeSerializer<IN> serializer,
ClusterBuilder builder,
CheckpointCommitter committer
) throws Exception;
/**
* Opens connections and validates checkpointing is enabled
* @throws Exception if checkpointing is disabled or connection fails
*/
public void open() throws Exception;
/**
* Closes connections and cleans up resources
* @throws Exception if cleanup fails
*/
@Override
public void close() throws Exception;
/**
* Sends batch of values to Cassandra with checkpoint coordination
* @param values batch of tuples to write
* @param checkpointId checkpoint ID for coordination
* @param timestamp checkpoint timestamp
* @return true if all writes successful, false if any failed
* @throws Exception if batch processing fails
*/
@Override
protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;
}Complete example showing exactly-once processing setup with proper error handling.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.java.tuple.Tuple3;
// Configure execution environment for exactly-once processing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing with exactly-once mode
env.enableCheckpointing(10000); // checkpoint every 10 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Configure state backend for durability
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints"));
// Data source
DataStream<Tuple3<String, Long, String>> criticalEvents = env
.addSource(new CriticalEventSource())
.name("Critical Events Source");
// Create WAL-enabled sink for exactly-once guarantees
ClusterBuilder sinkClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra-primary.example.com")
.addContactPoint("cassandra-secondary.example.com")
.withPort(9042)
.withCredentials("sink_user", "sink_password")
.withRetryPolicy(new ExponentialReconnectionPolicy(1000, 10000))
.build();
}
};
ClusterBuilder checkpointClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra-checkpoint.example.com")
.withPort(9042)
.withCredentials("checkpoint_user", "checkpoint_password")
.build();
}
};
CassandraCommitter committer = new CassandraCommitter(
checkpointClusterBuilder,
"flink_checkpoints"
);
CassandraSink<Tuple3<String, Long, String>> exactlyOnceSink = CassandraSink
.addSink(criticalEvents)
.setQuery("INSERT INTO critical_events (event_id, timestamp, data) VALUES (?, ?, ?)")
.setClusterBuilder(sinkClusterBuilder)
.enableWriteAheadLog(committer)
.build();
exactlyOnceSink
.name("Critical Events Cassandra Sink")
.uid("critical-events-sink") // Important for savepoint compatibility
.setParallelism(4);
env.execute("Critical Events Processing");Understanding how the connector behaves during failures and recovery.
// Recovery behavior configuration
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay between restarts
));
// On failure and restart:
// 1. Flink restores from last successful checkpoint
// 2. CassandraCommitter checks which records were already committed
// 3. WAL sink replays only uncommitted records
// 4. Processing continues from the checkpoint point
// Custom restart strategy for production
env.setRestartStrategy(RestartStrategies.failureRateRestart(
5, // max failures per interval
Time.of(5, TimeUnit.MINUTES), // time interval
Time.of(30, TimeUnit.SECONDS) // delay between restarts
));Implement monitoring for checkpoint and recovery metrics.
// Custom metrics for monitoring WAL sink performance
public class MonitoredCassandraCommitter extends CassandraCommitter {
private final Counter checkpointCommits;
private final Histogram checkpointLatency;
public MonitoredCassandraCommitter(ClusterBuilder builder, MetricGroup metricGroup) {
super(builder);
this.checkpointCommits = metricGroup.counter("checkpoint_commits");
this.checkpointLatency = metricGroup.histogram("checkpoint_latency");
}
@Override
public void commitCheckpoint(int subtaskIdx, long checkpointId) {
long startTime = System.currentTimeMillis();
super.commitCheckpoint(subtaskIdx, checkpointId);
long latency = System.currentTimeMillis() - startTime;
checkpointCommits.inc();
checkpointLatency.update(latency);
}
}The CassandraCommitter creates the following table structure:
-- Default keyspace: flink_auxiliary
-- Default table name: checkpoints_{job_id}
CREATE TABLE IF NOT EXISTS flink_auxiliary.checkpoints_job123 (
sink_id text,
sub_id int,
checkpoint_id bigint,
PRIMARY KEY (sink_id, sub_id)
);Configure custom keyspace and table settings for checkpoint storage.
// Custom keyspace configuration
CassandraCommitter committer = new CassandraCommitter(builder, "production_checkpoints");
// The committer will create:
// - Keyspace: production_checkpoints
// - Table: checkpoints_{job_id}
// - Replication: SimpleStrategy with replication_factor=1 (default)
// For production, consider creating the keyspace manually with appropriate replication:
/*
CREATE KEYSPACE production_checkpoints
WITH replication = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 3,
'datacenter2': 2
};
*/CheckpointingMode.EXACTLY_ONCE for critical dataInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra-2-10