Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-cassandra@1.14.0A comprehensive Apache Flink connector for Apache Cassandra that enables streaming applications to write data efficiently into Cassandra databases. The connector provides multiple sink implementations for different data types, supports both streaming and batch processing modes, offers configurable failure handling mechanisms, and integrates with Flink's checkpointing system for fault tolerance.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.14.6</version>
</dependency>For streaming applications:
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;For batch applications:
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;Configuration imports:
import org.apache.flink.streaming.connectors.cassandra.CassandraFailureHandler;
import org.apache.flink.streaming.connectors.cassandra.MapperOptions;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.api.java.tuple.Tuple3;
import com.datastax.driver.core.Cluster;
// Create a simple ClusterBuilder
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
};
// Configure and add sink to DataStream
DataStream<Tuple3<String, Integer, String>> stream = // ... your data stream
CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.words (word, count, description) VALUES (?, ?, ?);")
.setHost("127.0.0.1")
.build();import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import com.datastax.driver.mapping.annotations.Table;
import com.datastax.driver.mapping.annotations.Column;
// Define a POJO with Cassandra annotations
@Table(keyspace = "example", name = "users")
public class User {
@Column(name = "id")
private String id;
@Column(name = "name")
private String name;
@Column(name = "age")
private Integer age;
// constructors, getters, setters...
}
// Use the POJO sink
DataStream<User> userStream = // ... your user stream
CassandraSink.addSink(userStream)
.setDefaultKeyspace("example")
.setHost("127.0.0.1")
.build();CassandraSink.addSink(stream)
.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
.setHost("127.0.0.1")
.enableWriteAheadLog() // Enable exactly-once processing
.build();The Apache Flink Cassandra Connector is built around several key architectural components:
CassandraSinkBase with type-specific implementations for different data formatsCassandraSinkBuilder API for configuration with automatic type detectionCassandraSinkBaseConfig and builder patternsPrimary streaming sink functionality supporting multiple data types including Tuples, Rows, POJOs, and Scala Products. Provides builder-based configuration with automatic type detection and comprehensive failure handling.
public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);
public abstract static class CassandraSinkBuilder<IN> {
public CassandraSinkBuilder<IN> setQuery(String query);
public CassandraSinkBuilder<IN> setHost(String host);
public CassandraSinkBuilder<IN> setHost(String host, int port);
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
public CassandraSinkBuilder<IN> enableWriteAheadLog();
public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler);
public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests);
public CassandraSinkBuilder<IN> enableIgnoreNullFields();
public CassandraSink<IN> build();
}Batch input and output formats for reading from and writing to Cassandra in batch processing jobs. Supports Tuples, Rows, and POJOs with configurable parallelism and connection management.
public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT>;
public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT>;
public class CassandraTupleOutputFormat<OUT extends Tuple> extends CassandraOutputFormatBase<OUT>;
public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT>;Connection builders, failure handlers, and configuration objects for customizing Cassandra connectivity, error handling, and performance tuning.
public abstract class ClusterBuilder {
protected abstract Cluster buildCluster(Cluster.Builder builder);
}
public interface CassandraFailureHandler {
void onFailure(Throwable failure) throws IOException;
}
public interface MapperOptions {
Mapper.Option[] getMapperOptions();
}Exactly-once processing guarantees through write-ahead logging with checkpoint integration. Stores records in Flink's state backend and commits them to Cassandra only on successful checkpoint completion.
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN>;
public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row>;
public class CassandraCommitter extends CheckpointCommitter;Integration with Flink's Table API for declarative stream processing. Provides append-only table sinks with schema inference and SQL compatibility.
public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
public CassandraAppendTableSink(ClusterBuilder builder, String cql);
public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream);
}// Connection configuration
public abstract class ClusterBuilder implements Serializable {
protected abstract Cluster buildCluster(Cluster.Builder builder);
}
// Failure handling
public interface CassandraFailureHandler extends Serializable {
void onFailure(Throwable failure) throws IOException;
}
// Configuration management
public final class CassandraSinkBaseConfig {
public int getMaxConcurrentRequests();
public Duration getMaxConcurrentRequestsTimeout();
public boolean getIgnoreNullFields();
public static Builder newBuilder();
}
// Mapper configuration for POJOs
public interface MapperOptions extends Serializable {
Mapper.Option[] getMapperOptions();
}
// Checkpoint management for exactly-once processing
public class CassandraCommitter extends CheckpointCommitter {
public CassandraCommitter(ClusterBuilder builder);
public void commitCheckpoint(int subtaskIdx, long checkpointId);
public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
}