Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-cassandra-2-10@1.3.0Apache Flink Cassandra Connector provides comprehensive integration between Apache Flink and Apache Cassandra databases. It supports both streaming (DataStream API) and batch (DataSet API) processing with multiple sink implementations for different data types and processing guarantees, including exactly-once semantics through write-ahead logging.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase;
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.api.java.tuple.Tuple3;
import com.datastax.driver.core.Cluster;
// Define a ClusterBuilder for connection configuration
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").withPort(9042).build();
}
};
// Create a tuple-based sink
DataStream<Tuple3<String, Integer, String>> stream = // your data stream
CassandraSink<Tuple3<String, Integer, String>> sink = CassandraSink
.addSink(stream)
.setQuery("INSERT INTO example.users (name, age, email) VALUES (?, ?, ?);")
.setHost("localhost", 9042)
.build();
sink.name("Cassandra Sink");import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read from Cassandra
CassandraInputFormat<Tuple2<String, Integer>> inputFormat =
new CassandraInputFormat<>("SELECT name, age FROM example.users", builder);
DataSet<Tuple2<String, Integer>> input = env.createInput(inputFormat);
// Write to Cassandra
CassandraOutputFormat<Tuple2<String, String>> outputFormat =
new CassandraOutputFormat<>("INSERT INTO example.processed (name, status) VALUES (?, ?)", builder);
result.output(outputFormat);The Flink Cassandra Connector is organized into two main packages with distinct responsibilities:
org.apache.flink.streaming.connectors.cassandra): DataStream API integration with multiple sink implementationsorg.apache.flink.batch.connectors.cassandra): DataSet API integration for batch processingClusterBuilder abstract class for Cassandra cluster connection setupComprehensive sink implementations for streaming data integration with different data types and processing guarantees.
public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);
public abstract static class CassandraSinkBuilder<IN> {
public CassandraSinkBuilder<IN> setQuery(String query);
public CassandraSinkBuilder<IN> setHost(String host);
public CassandraSinkBuilder<IN> setHost(String host, int port);
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
public CassandraSinkBuilder<IN> enableWriteAheadLog();
public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
public abstract CassandraSink<IN> build();
}Input and output formats for batch processing jobs using the DataSet API.
public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> {
public CassandraInputFormat(String query, ClusterBuilder builder);
}
public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
}Abstract configuration system for customizing Cassandra cluster connections with support for authentication, SSL, and advanced connection parameters.
public abstract class ClusterBuilder implements Serializable {
public Cluster getCluster();
protected abstract Cluster buildCluster(Cluster.Builder builder);
}Exactly-once processing guarantees through checkpoint coordination and write-ahead logging for streaming applications.
public class CassandraCommitter extends CheckpointCommitter {
public CassandraCommitter(ClusterBuilder builder);
public CassandraCommitter(ClusterBuilder builder, String keySpace);
}// Main sink wrapper class
public class CassandraSink<IN> {
// Sink configuration and lifecycle methods
public CassandraSink<IN> name(String name);
public CassandraSink<IN> uid(String uid);
public CassandraSink<IN> setUidHash(String uidHash);
public CassandraSink<IN> setParallelism(int parallelism);
public CassandraSink<IN> disableChaining();
public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
}
// Tuple-specific sink builder
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
// Specialized for tuple-based data with CQL queries
}
// POJO-specific sink builder
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
// Specialized for POJO-based data with DataStax mapping annotations
}