Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases
—
Comprehensive streaming sink functionality for writing data from Flink DataStreams to Cassandra. The connector supports multiple data types with automatic type detection, builder-based configuration, and robust failure handling mechanisms.
The primary entry point for creating Cassandra sinks with automatic type detection and builder creation.
public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);
public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input);The addSink method automatically detects the input data type and returns the appropriate builder:
TupleTypeInfo → CassandraTupleSinkBuilderRowTypeInfo → CassandraRowSinkBuilderPojoTypeInfo → CassandraPojoSinkBuilderCaseClassTypeInfo → CassandraScalaProductSinkBuilderBase builder providing common configuration options for all sink types.
public abstract static class CassandraSinkBuilder<IN> {
public CassandraSinkBuilder<IN> setQuery(String query);
public CassandraSinkBuilder<IN> setDefaultKeyspace(String keyspace);
public CassandraSinkBuilder<IN> setHost(String host);
public CassandraSinkBuilder<IN> setHost(String host, int port);
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
public CassandraSinkBuilder<IN> enableWriteAheadLog();
public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions options);
public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler);
public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests);
public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout);
public CassandraSinkBuilder<IN> enableIgnoreNullFields();
public CassandraSink<IN> build();
}Usage Examples:
// Basic configuration with host
CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
.setHost("127.0.0.1", 9042)
.build();
// Advanced configuration with custom cluster builder
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("127.0.0.1")
.addContactPoint("127.0.0.2")
.withPort(9042)
.withCredentials("username", "password")
.build();
}
};
CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
.setClusterBuilder(builder)
.setMaxConcurrentRequests(100)
.enableIgnoreNullFields()
.build();Specialized builder for Flink Tuple types with CQL query requirements.
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
// Inherits all methods from CassandraSinkBuilder
// Requires setQuery() to be called
// Does not support setDefaultKeyspace()
}Usage Example:
DataStream<Tuple3<String, Integer, Boolean>> tupleStream = // ... your stream
CassandraSink.addSink(tupleStream)
.setQuery("INSERT INTO example.metrics (name, value, active) VALUES (?, ?, ?);")
.setHost("127.0.0.1")
.build();Specialized builder for Flink Row types with schema-based field mapping.
public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> {
// Inherits all methods from CassandraSinkBuilder
// Requires setQuery() to be called
// Does not support setDefaultKeyspace()
}Usage Example:
DataStream<Row> rowStream = // ... your row stream
CassandraSink.addSink(rowStream)
.setQuery("INSERT INTO example.users (id, name, age) VALUES (?, ?, ?);")
.setHost("127.0.0.1")
.build();Specialized builder for Plain Old Java Objects using DataStax object mapping.
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
// Inherits all methods from CassandraSinkBuilder
// Does not support setQuery() - uses DataStax annotations
// Supports setDefaultKeyspace() and setMapperOptions()
}Usage Example:
// Define POJO with Cassandra annotations
@Table(keyspace = "example", name = "products")
public class Product {
@PartitionKey
private String id;
@Column(name = "name")
private String name;
@Column(name = "price")
private BigDecimal price;
// constructors, getters, setters...
}
// Configure sink with mapper options
MapperOptions options = new MapperOptions() {
@Override
public Mapper.Option[] getMapperOptions() {
return new Mapper.Option[] {
Mapper.Option.saveNullFields(false),
Mapper.Option.timestamp(System.currentTimeMillis())
};
}
};
DataStream<Product> productStream = // ... your product stream
CassandraSink.addSink(productStream)
.setDefaultKeyspace("example")
.setMapperOptions(options)
.setHost("127.0.0.1")
.build();Specialized builder for Scala case classes and tuples.
public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
// Inherits all methods from CassandraSinkBuilder
// Requires setQuery() to be called
// Does not support setDefaultKeyspace()
}The resulting CassandraSink provides Flink operator configuration methods.
public class CassandraSink<IN> {
public CassandraSink<IN> name(String name);
public CassandraSink<IN> uid(String uid);
public CassandraSink<IN> setUidHash(String uidHash);
public CassandraSink<IN> setParallelism(int parallelism);
public CassandraSink<IN> disableChaining();
public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
}Usage Example:
CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.events (id, timestamp, data) VALUES (?, ?, ?);")
.setHost("127.0.0.1")
.build()
.name("Cassandra Event Sink")
.uid("cassandra-sink-1")
.setParallelism(4)
.slotSharingGroup("cassandra-sinks");All streaming sinks extend from the common base class providing core functionality.
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
public void open(Configuration configuration);
public void close();
public void initializeState(FunctionInitializationContext context);
public void snapshotState(FunctionSnapshotContext ctx);
public void invoke(IN value);
public abstract ListenableFuture<V> send(IN value);
}Base class for sinks that work with tuple-like data structures.
public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler);
public void open(Configuration configuration);
public ListenableFuture<ResultSet> send(IN value);
protected abstract Object[] extract(IN record);
}Individual sink implementations for different data types.
// Flink Tuple sink
public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
public CassandraTupleSink(String insertQuery, ClusterBuilder builder);
protected Object[] extract(IN record);
}
// Flink Row sink
public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {
public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder);
protected Object[] extract(Row record);
}
// POJO sink with DataStax mapping
public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder);
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, MapperOptions options);
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspace);
public void open(Configuration configuration);
public ListenableFuture<ResultSet> send(IN value);
}
// Scala Product sink
public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder);
protected Object[] extract(IN record);
}All sinks support custom failure handlers for robust error handling:
// Custom failure handler example
CassandraFailureHandler customHandler = new CassandraFailureHandler() {
@Override
public void onFailure(Throwable failure) throws IOException {
if (failure instanceof WriteTimeoutException) {
// Log timeout but continue processing
logger.warn("Write timeout, continuing", failure);
return;
}
// Re-throw other exceptions to fail the sink
throw new IOException("Cassandra write failed", failure);
}
};
CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.data (id, value) VALUES (?, ?);")
.setHost("127.0.0.1")
.setFailureHandler(customHandler)
.build();Configure concurrency and resource management:
CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.data (id, value) VALUES (?, ?);")
.setHost("127.0.0.1")
.setMaxConcurrentRequests(100) // Limit concurrent requests
.setMaxConcurrentRequests(100, Duration.ofSeconds(30)) // With timeout
.enableIgnoreNullFields() // Avoid tombstones
.build();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra