Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities
—
Comprehensive sink implementations for streaming data integration with Apache Cassandra. The connector provides multiple sink types optimized for different data formats (tuples vs POJOs) and processing guarantees (at-least-once vs exactly-once).
Main entry point for creating Cassandra sinks with automatic type detection and builder selection.
/**
* Creates a sink builder appropriate for the input stream type
* @param input DataStream to sink to Cassandra
* @return CassandraSinkBuilder for further configuration
*/
public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);Usage Examples:
// Tuple-based sink (uses CassandraTupleSinkBuilder)
DataStream<Tuple3<String, Integer, String>> tupleStream = // your stream
CassandraSink.addSink(tupleStream)
.setQuery("INSERT INTO users (name, age, email) VALUES (?, ?, ?)")
.setHost("localhost")
.build();
// POJO-based sink (uses CassandraPojoSinkBuilder)
DataStream<User> pojoStream = // your stream with @Table annotated POJOs
CassandraSink.addSink(pojoStream)
.setHost("localhost")
.build();Abstract base class providing common configuration options for all sink types.
/**
* Base builder class for Cassandra sink configuration
*/
public abstract static class CassandraSinkBuilder<IN> {
/**
* Sets the CQL query for tuple-based sinks (not applicable for POJO sinks)
* @param query CQL INSERT statement with parameter placeholders
* @return this builder for method chaining
*/
public CassandraSinkBuilder<IN> setQuery(String query);
/**
* Sets Cassandra host with default port 9042
* @param host hostname or IP address
* @return this builder for method chaining
*/
public CassandraSinkBuilder<IN> setHost(String host);
/**
* Sets Cassandra host and port
* @param host hostname or IP address
* @param port port number
* @return this builder for method chaining
*/
public CassandraSinkBuilder<IN> setHost(String host, int port);
/**
* Sets custom cluster configuration builder
* @param builder ClusterBuilder for advanced connection configuration
* @return this builder for method chaining
*/
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
/**
* Enables write-ahead logging for exactly-once processing guarantees
* @return this builder for method chaining
*/
public CassandraSinkBuilder<IN> enableWriteAheadLog();
/**
* Enables write-ahead logging with custom checkpoint committer
* @param committer custom CheckpointCommitter implementation
* @return this builder for method chaining
*/
public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
/**
* Finalizes the sink configuration and creates the sink
* @return configured CassandraSink
* @throws Exception if configuration is invalid
*/
public abstract CassandraSink<IN> build() throws Exception;
}Specialized builder for tuple-based data streams requiring explicit CQL queries.
/**
* Builder for tuple-based Cassandra sinks
* Requires explicit CQL query with parameter placeholders matching tuple arity
*/
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer);
/**
* Builds tuple sink with optional write-ahead logging
* @return CassandraSink configured for tuple data
* @throws Exception if query is null/empty or cluster configuration missing
*/
@Override
public CassandraSink<IN> build() throws Exception;
}Usage Examples:
DataStream<Tuple4<String, Integer, Double, Boolean>> orders = // your stream
CassandraSink<Tuple4<String, Integer, Double, Boolean>> sink = CassandraSink
.addSink(orders)
.setQuery("INSERT INTO orders (id, quantity, price, processed) VALUES (?, ?, ?, ?)")
.setHost("cassandra-cluster", 9042)
.enableWriteAheadLog() // for exactly-once guarantees
.build();
sink.name("Orders Cassandra Sink")
.setParallelism(4);Specialized builder for POJO-based data streams using DataStax mapping annotations.
/**
* Builder for POJO-based Cassandra sinks
* Uses DataStax mapping annotations on POJO classes for table mapping
* CQL queries are not allowed - mapping is handled automatically
*/
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer);
/**
* Builds POJO sink (write-ahead logging not supported for POJOs)
* @return CassandraSink configured for POJO data
* @throws Exception if query is specified (not allowed) or cluster configuration missing
*/
@Override
public CassandraSink<IN> build() throws Exception;
}Usage Examples:
// POJO class with DataStax mapping annotations
@Table(keyspace = "analytics", name = "user_events")
public class UserEvent {
@PartitionKey
@Column(name = "user_id")
private String userId;
@Column(name = "event_type")
private String eventType;
@Column(name = "timestamp")
private Long timestamp;
// constructors, getters, setters...
}
DataStream<UserEvent> events = // your stream
CassandraSink<UserEvent> sink = CassandraSink
.addSink(events)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra1.example.com")
.addContactPoint("cassandra2.example.com")
.withPort(9042)
.withCredentials("username", "password")
.build();
}
})
.build();The main sink wrapper providing Flink operator configuration methods.
/**
* Main Cassandra sink wrapper class providing Flink operator configuration
*/
public class CassandraSink<IN> {
/**
* Sets the name of this sink for visualization and logging
* @param name operator name
* @return this sink for method chaining
*/
public CassandraSink<IN> name(String name);
/**
* Sets unique operator ID for savepoint compatibility
* @param uid unique operator identifier
* @return this sink for method chaining
*/
public CassandraSink<IN> uid(String uid);
/**
* Sets user-provided hash for JobVertexID
* @param uidHash user-provided hash string
* @return this sink for method chaining
*/
public CassandraSink<IN> setUidHash(String uidHash);
/**
* Sets the parallelism for this sink
* @param parallelism degree of parallelism (must be > 0)
* @return this sink for method chaining
*/
public CassandraSink<IN> setParallelism(int parallelism);
/**
* Disables operator chaining for this sink
* @return this sink for method chaining
*/
public CassandraSink<IN> disableChaining();
/**
* Sets the slot sharing group for co-location control
* @param slotSharingGroup slot sharing group name
* @return this sink for method chaining
*/
public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
}Base abstract class providing common functionality for all Cassandra sink implementations.
/**
* Common abstract class for CassandraPojoSink and CassandraTupleSink
* Provides connection management, error handling, and asynchronous callback processing
*/
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
/**
* Creates sink base with cluster configuration
* @param builder ClusterBuilder for connection configuration
*/
protected CassandraSinkBase(ClusterBuilder builder);
/**
* Opens connection and initializes callback handling
* @param configuration Flink configuration parameters
*/
@Override
public void open(Configuration configuration);
/**
* Invokes sink processing for input value with error handling
* @param value input value to process
* @throws Exception if processing fails or previous error occurred
*/
@Override
public void invoke(IN value) throws Exception;
/**
* Abstract method for sending value to Cassandra (implemented by subclasses)
* @param value input value to send
* @return ListenableFuture for asynchronous processing
*/
public abstract ListenableFuture<V> send(IN value);
/**
* Closes connections and waits for pending operations to complete
* @throws Exception if cleanup fails or pending operations had errors
*/
@Override
public void close() throws Exception;
}Direct sink implementation for tuple data with parameterized CQL queries.
/**
* Sink implementation for tuple-based data using prepared CQL statements
*/
public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
/**
* Creates a tuple sink with CQL query and cluster configuration
* @param insertQuery CQL INSERT statement with parameter placeholders
* @param builder ClusterBuilder for connection configuration
*/
public CassandraTupleSink(String insertQuery, ClusterBuilder builder);
/**
* Opens connection and prepares CQL statement
* @param configuration Flink configuration parameters
*/
@Override
public void open(Configuration configuration);
/**
* Sends tuple value to Cassandra using prepared statement
* @param value tuple value to send
* @return ListenableFuture for asynchronous execution
*/
@Override
public ListenableFuture<ResultSet> send(IN value);
/**
* Extracts field values from tuple into object array for prepared statement binding
* @param record tuple record to extract fields from
* @return Object array with field values
*/
private Object[] extract(IN record);
}Direct sink implementation for POJO data using DataStax mapping framework.
/**
* Sink implementation for POJO-based data using DataStax mapping annotations
*/
public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
/**
* Creates a POJO sink with mapping configuration
* @param clazz POJO class with DataStax mapping annotations
* @param builder ClusterBuilder for connection configuration
*/
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder);
/**
* Opens connection and initializes DataStax mapper
* @param configuration Flink configuration parameters
*/
@Override
public void open(Configuration configuration);
/**
* Sends POJO value to Cassandra using DataStax mapper
* @param value POJO value to send
* @return ListenableFuture for asynchronous execution
*/
@Override
public ListenableFuture<ResultSet> send(IN value);
}Write-ahead logging sink for exactly-once processing guarantees.
/**
* Write-ahead logging sink for tuple data providing exactly-once semantics
* Stores incoming records in state backend and commits only on checkpoint completion
*/
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
/**
* Creates WAL-enabled sink with checkpoint coordination
* @param insertQuery CQL INSERT statement with parameter placeholders
* @param serializer type serializer for state backend storage
* @param builder ClusterBuilder for connection configuration
* @param committer CheckpointCommitter for checkpoint state management
* @throws Exception if initialization fails
*/
protected CassandraTupleWriteAheadSink(
String insertQuery,
TypeSerializer<IN> serializer,
ClusterBuilder builder,
CheckpointCommitter committer
) throws Exception;
/**
* Opens connections and validates checkpointing is enabled
* @throws Exception if checkpointing is disabled or connection fails
*/
public void open() throws Exception;
/**
* Closes connections and cleans up resources
* @throws Exception if cleanup fails
*/
@Override
public void close() throws Exception;
/**
* Sends batch of values to Cassandra with checkpoint coordination
* @param values batch of tuples to write
* @param checkpointId checkpoint ID for coordination
* @param timestamp checkpoint timestamp
* @return true if all writes successful, false if any failed
* @throws Exception if batch processing fails
*/
@Override
protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;
}All sink implementations provide asynchronous error handling through Guava ListenableFuture callbacks. Errors during Cassandra operations are logged and cause the sink to fail the current record processing.
For write-ahead logging sinks, failed operations will trigger checkpoint rollback and retry mechanisms according to Flink's fault tolerance configuration.
Common error scenarios:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra-2-10