CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities

Pending
Overview
Eval results
Files

streaming-sinks.mddocs/

Streaming Data Sinks

Comprehensive sink implementations for streaming data integration with Apache Cassandra. The connector provides multiple sink types optimized for different data formats (tuples vs POJOs) and processing guarantees (at-least-once vs exactly-once).

Capabilities

CassandraSink Factory

Main entry point for creating Cassandra sinks with automatic type detection and builder selection.

/**
 * Creates a sink builder appropriate for the input stream type
 * @param input DataStream to sink to Cassandra
 * @return CassandraSinkBuilder for further configuration
 */
public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);

Usage Examples:

// Tuple-based sink (uses CassandraTupleSinkBuilder)
DataStream<Tuple3<String, Integer, String>> tupleStream = // your stream
CassandraSink.addSink(tupleStream)
    .setQuery("INSERT INTO users (name, age, email) VALUES (?, ?, ?)")
    .setHost("localhost")
    .build();

// POJO-based sink (uses CassandraPojoSinkBuilder) 
DataStream<User> pojoStream = // your stream with @Table annotated POJOs
CassandraSink.addSink(pojoStream)
    .setHost("localhost")
    .build();

CassandraSinkBuilder Base Configuration

Abstract base class providing common configuration options for all sink types.

/**
 * Base builder class for Cassandra sink configuration
 */
public abstract static class CassandraSinkBuilder<IN> {
    /**
     * Sets the CQL query for tuple-based sinks (not applicable for POJO sinks)
     * @param query CQL INSERT statement with parameter placeholders
     * @return this builder for method chaining
     */
    public CassandraSinkBuilder<IN> setQuery(String query);
    
    /**
     * Sets Cassandra host with default port 9042
     * @param host hostname or IP address
     * @return this builder for method chaining
     */
    public CassandraSinkBuilder<IN> setHost(String host);
    
    /**
     * Sets Cassandra host and port
     * @param host hostname or IP address
     * @param port port number
     * @return this builder for method chaining
     */
    public CassandraSinkBuilder<IN> setHost(String host, int port);
    
    /**
     * Sets custom cluster configuration builder
     * @param builder ClusterBuilder for advanced connection configuration
     * @return this builder for method chaining
     */
    public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
    
    /**
     * Enables write-ahead logging for exactly-once processing guarantees
     * @return this builder for method chaining
     */
    public CassandraSinkBuilder<IN> enableWriteAheadLog();
    
    /**
     * Enables write-ahead logging with custom checkpoint committer
     * @param committer custom CheckpointCommitter implementation
     * @return this builder for method chaining
     */
    public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
    
    /**
     * Finalizes the sink configuration and creates the sink
     * @return configured CassandraSink
     * @throws Exception if configuration is invalid
     */
    public abstract CassandraSink<IN> build() throws Exception;
}

CassandraTupleSinkBuilder

Specialized builder for tuple-based data streams requiring explicit CQL queries.

/**
 * Builder for tuple-based Cassandra sinks
 * Requires explicit CQL query with parameter placeholders matching tuple arity
 */
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
    public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer);
    
    /**
     * Builds tuple sink with optional write-ahead logging
     * @return CassandraSink configured for tuple data
     * @throws Exception if query is null/empty or cluster configuration missing
     */
    @Override
    public CassandraSink<IN> build() throws Exception;
}

Usage Examples:

DataStream<Tuple4<String, Integer, Double, Boolean>> orders = // your stream

CassandraSink<Tuple4<String, Integer, Double, Boolean>> sink = CassandraSink
    .addSink(orders)
    .setQuery("INSERT INTO orders (id, quantity, price, processed) VALUES (?, ?, ?, ?)")
    .setHost("cassandra-cluster", 9042)
    .enableWriteAheadLog() // for exactly-once guarantees
    .build();

sink.name("Orders Cassandra Sink")
    .setParallelism(4);

CassandraPojoSinkBuilder

Specialized builder for POJO-based data streams using DataStax mapping annotations.

/**
 * Builder for POJO-based Cassandra sinks
 * Uses DataStax mapping annotations on POJO classes for table mapping
 * CQL queries are not allowed - mapping is handled automatically
 */
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
    public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer);
    
    /**
     * Builds POJO sink (write-ahead logging not supported for POJOs)
     * @return CassandraSink configured for POJO data
     * @throws Exception if query is specified (not allowed) or cluster configuration missing
     */
    @Override
    public CassandraSink<IN> build() throws Exception;
}

Usage Examples:

// POJO class with DataStax mapping annotations
@Table(keyspace = "analytics", name = "user_events")
public class UserEvent {
    @PartitionKey
    @Column(name = "user_id")
    private String userId;
    
    @Column(name = "event_type")
    private String eventType;
    
    @Column(name = "timestamp")
    private Long timestamp;
    
    // constructors, getters, setters...
}

DataStream<UserEvent> events = // your stream

CassandraSink<UserEvent> sink = CassandraSink
    .addSink(events)
    .setClusterBuilder(new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder
                .addContactPoint("cassandra1.example.com")
                .addContactPoint("cassandra2.example.com")
                .withPort(9042)
                .withCredentials("username", "password")
                .build();
        }
    })
    .build();

CassandraSink Configuration

The main sink wrapper providing Flink operator configuration methods.

/**
 * Main Cassandra sink wrapper class providing Flink operator configuration
 */
public class CassandraSink<IN> {
    /**
     * Sets the name of this sink for visualization and logging
     * @param name operator name
     * @return this sink for method chaining
     */
    public CassandraSink<IN> name(String name);
    
    /**
     * Sets unique operator ID for savepoint compatibility
     * @param uid unique operator identifier
     * @return this sink for method chaining
     */
    public CassandraSink<IN> uid(String uid);
    
    /**
     * Sets user-provided hash for JobVertexID
     * @param uidHash user-provided hash string
     * @return this sink for method chaining
     */
    public CassandraSink<IN> setUidHash(String uidHash);
    
    /**
     * Sets the parallelism for this sink
     * @param parallelism degree of parallelism (must be > 0)
     * @return this sink for method chaining
     */
    public CassandraSink<IN> setParallelism(int parallelism);
    
    /**
     * Disables operator chaining for this sink
     * @return this sink for method chaining
     */
    public CassandraSink<IN> disableChaining();
    
    /**
     * Sets the slot sharing group for co-location control
     * @param slotSharingGroup slot sharing group name
     * @return this sink for method chaining
     */
    public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
}

CassandraSinkBase Abstract Class

Base abstract class providing common functionality for all Cassandra sink implementations.

/**
 * Common abstract class for CassandraPojoSink and CassandraTupleSink
 * Provides connection management, error handling, and asynchronous callback processing
 */
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    /**
     * Creates sink base with cluster configuration
     * @param builder ClusterBuilder for connection configuration
     */
    protected CassandraSinkBase(ClusterBuilder builder);
    
    /**
     * Opens connection and initializes callback handling
     * @param configuration Flink configuration parameters
     */
    @Override
    public void open(Configuration configuration);
    
    /**
     * Invokes sink processing for input value with error handling
     * @param value input value to process
     * @throws Exception if processing fails or previous error occurred
     */
    @Override
    public void invoke(IN value) throws Exception;
    
    /**
     * Abstract method for sending value to Cassandra (implemented by subclasses)
     * @param value input value to send
     * @return ListenableFuture for asynchronous processing
     */
    public abstract ListenableFuture<V> send(IN value);
    
    /**
     * Closes connections and waits for pending operations to complete
     * @throws Exception if cleanup fails or pending operations had errors
     */
    @Override
    public void close() throws Exception;
}

Sink Implementation Classes

CassandraTupleSink

Direct sink implementation for tuple data with parameterized CQL queries.

/**
 * Sink implementation for tuple-based data using prepared CQL statements
 */
public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
    /**
     * Creates a tuple sink with CQL query and cluster configuration
     * @param insertQuery CQL INSERT statement with parameter placeholders
     * @param builder ClusterBuilder for connection configuration
     */
    public CassandraTupleSink(String insertQuery, ClusterBuilder builder);
    
    /**
     * Opens connection and prepares CQL statement
     * @param configuration Flink configuration parameters
     */
    @Override
    public void open(Configuration configuration);
    
    /**
     * Sends tuple value to Cassandra using prepared statement
     * @param value tuple value to send
     * @return ListenableFuture for asynchronous execution
     */
    @Override
    public ListenableFuture<ResultSet> send(IN value);
    
    /**
     * Extracts field values from tuple into object array for prepared statement binding
     * @param record tuple record to extract fields from
     * @return Object array with field values
     */
    private Object[] extract(IN record);
}

CassandraPojoSink

Direct sink implementation for POJO data using DataStax mapping framework.

/**
 * Sink implementation for POJO-based data using DataStax mapping annotations
 */
public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
    /**
     * Creates a POJO sink with mapping configuration
     * @param clazz POJO class with DataStax mapping annotations
     * @param builder ClusterBuilder for connection configuration
     */
    public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder);
    
    /**
     * Opens connection and initializes DataStax mapper
     * @param configuration Flink configuration parameters
     */
    @Override
    public void open(Configuration configuration);
    
    /**
     * Sends POJO value to Cassandra using DataStax mapper
     * @param value POJO value to send
     * @return ListenableFuture for asynchronous execution
     */
    @Override
    public ListenableFuture<ResultSet> send(IN value);
}

CassandraTupleWriteAheadSink

Write-ahead logging sink for exactly-once processing guarantees.

/**
 * Write-ahead logging sink for tuple data providing exactly-once semantics
 * Stores incoming records in state backend and commits only on checkpoint completion
 */
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
    /**
     * Creates WAL-enabled sink with checkpoint coordination
     * @param insertQuery CQL INSERT statement with parameter placeholders
     * @param serializer type serializer for state backend storage
     * @param builder ClusterBuilder for connection configuration
     * @param committer CheckpointCommitter for checkpoint state management
     * @throws Exception if initialization fails
     */
    protected CassandraTupleWriteAheadSink(
        String insertQuery, 
        TypeSerializer<IN> serializer, 
        ClusterBuilder builder, 
        CheckpointCommitter committer
    ) throws Exception;
    
    /**
     * Opens connections and validates checkpointing is enabled
     * @throws Exception if checkpointing is disabled or connection fails
     */
    public void open() throws Exception;
    
    /**
     * Closes connections and cleans up resources
     * @throws Exception if cleanup fails
     */
    @Override
    public void close() throws Exception;
    
    /**
     * Sends batch of values to Cassandra with checkpoint coordination
     * @param values batch of tuples to write
     * @param checkpointId checkpoint ID for coordination
     * @param timestamp checkpoint timestamp
     * @return true if all writes successful, false if any failed
     * @throws Exception if batch processing fails
     */
    @Override
    protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;
}

Error Handling

All sink implementations provide asynchronous error handling through Guava ListenableFuture callbacks. Errors during Cassandra operations are logged and cause the sink to fail the current record processing.

For write-ahead logging sinks, failed operations will trigger checkpoint rollback and retry mechanisms according to Flink's fault tolerance configuration.

Common error scenarios:

  • Connection failures: Cluster unreachable or authentication issues
  • Schema mismatches: CQL query parameters don't match tuple arity or POJO mapping issues
  • Constraint violations: Primary key conflicts or data type conversion errors

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

docs

batch-processing.md

connection-configuration.md

fault-tolerance.md

index.md

streaming-sinks.md

tile.json