CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-cassandra

Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases

Pending
Overview
Eval results
Files

streaming-sinks.mddocs/

Streaming Data Sinks

Comprehensive streaming sink functionality for writing data from Flink DataStreams to Cassandra. The connector supports multiple data types with automatic type detection, builder-based configuration, and robust failure handling mechanisms.

Capabilities

Main Sink Entry Point

The primary entry point for creating Cassandra sinks with automatic type detection and builder creation.

public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);
public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input);

The addSink method automatically detects the input data type and returns the appropriate builder:

  • TupleTypeInfoCassandraTupleSinkBuilder
  • RowTypeInfoCassandraRowSinkBuilder
  • PojoTypeInfoCassandraPojoSinkBuilder
  • CaseClassTypeInfoCassandraScalaProductSinkBuilder

Sink Builder Configuration

Base builder providing common configuration options for all sink types.

public abstract static class CassandraSinkBuilder<IN> {
    public CassandraSinkBuilder<IN> setQuery(String query);
    public CassandraSinkBuilder<IN> setDefaultKeyspace(String keyspace);
    public CassandraSinkBuilder<IN> setHost(String host);
    public CassandraSinkBuilder<IN> setHost(String host, int port);
    public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
    public CassandraSinkBuilder<IN> enableWriteAheadLog();
    public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
    public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions options);
    public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler);
    public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests);
    public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout);
    public CassandraSinkBuilder<IN> enableIgnoreNullFields();
    public CassandraSink<IN> build();
}

Usage Examples:

// Basic configuration with host
CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
    .setHost("127.0.0.1", 9042)
    .build();

// Advanced configuration with custom cluster builder
ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoint("127.0.0.1")
            .addContactPoint("127.0.0.2")
            .withPort(9042)
            .withCredentials("username", "password")
            .build();
    }
};

CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
    .setClusterBuilder(builder)
    .setMaxConcurrentRequests(100)
    .enableIgnoreNullFields()
    .build();

Tuple Sink Builder

Specialized builder for Flink Tuple types with CQL query requirements.

public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
    // Inherits all methods from CassandraSinkBuilder
    // Requires setQuery() to be called
    // Does not support setDefaultKeyspace()
}

Usage Example:

DataStream<Tuple3<String, Integer, Boolean>> tupleStream = // ... your stream

CassandraSink.addSink(tupleStream)
    .setQuery("INSERT INTO example.metrics (name, value, active) VALUES (?, ?, ?);")
    .setHost("127.0.0.1")
    .build();

Row Sink Builder

Specialized builder for Flink Row types with schema-based field mapping.

public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> {
    // Inherits all methods from CassandraSinkBuilder
    // Requires setQuery() to be called
    // Does not support setDefaultKeyspace()
}

Usage Example:

DataStream<Row> rowStream = // ... your row stream

CassandraSink.addSink(rowStream)
    .setQuery("INSERT INTO example.users (id, name, age) VALUES (?, ?, ?);")
    .setHost("127.0.0.1")
    .build();

POJO Sink Builder

Specialized builder for Plain Old Java Objects using DataStax object mapping.

public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
    // Inherits all methods from CassandraSinkBuilder
    // Does not support setQuery() - uses DataStax annotations
    // Supports setDefaultKeyspace() and setMapperOptions()
}

Usage Example:

// Define POJO with Cassandra annotations
@Table(keyspace = "example", name = "products")
public class Product {
    @PartitionKey
    private String id;
    
    @Column(name = "name")
    private String name;
    
    @Column(name = "price")
    private BigDecimal price;
    
    // constructors, getters, setters...
}

// Configure sink with mapper options
MapperOptions options = new MapperOptions() {
    @Override
    public Mapper.Option[] getMapperOptions() {
        return new Mapper.Option[] {
            Mapper.Option.saveNullFields(false),
            Mapper.Option.timestamp(System.currentTimeMillis())
        };
    }
};

DataStream<Product> productStream = // ... your product stream

CassandraSink.addSink(productStream)
    .setDefaultKeyspace("example")
    .setMapperOptions(options)
    .setHost("127.0.0.1")
    .build();

Scala Product Sink Builder

Specialized builder for Scala case classes and tuples.

public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
    // Inherits all methods from CassandraSinkBuilder
    // Requires setQuery() to be called
    // Does not support setDefaultKeyspace()
}

Sink Operations and Control

The resulting CassandraSink provides Flink operator configuration methods.

public class CassandraSink<IN> {
    public CassandraSink<IN> name(String name);
    public CassandraSink<IN> uid(String uid);
    public CassandraSink<IN> setUidHash(String uidHash);
    public CassandraSink<IN> setParallelism(int parallelism);
    public CassandraSink<IN> disableChaining();
    public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
}

Usage Example:

CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.events (id, timestamp, data) VALUES (?, ?, ?);")
    .setHost("127.0.0.1")
    .build()
    .name("Cassandra Event Sink")
    .uid("cassandra-sink-1")
    .setParallelism(4)
    .slotSharingGroup("cassandra-sinks");

Core Sink Implementations

Base Sink Functionality

All streaming sinks extend from the common base class providing core functionality.

public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    public void open(Configuration configuration);
    public void close();
    public void initializeState(FunctionInitializationContext context);
    public void snapshotState(FunctionSnapshotContext ctx);
    public void invoke(IN value);
    public abstract ListenableFuture<V> send(IN value);
}

Tuple-Based Sinks

Base class for sinks that work with tuple-like data structures.

public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
    public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler);
    public void open(Configuration configuration);
    public ListenableFuture<ResultSet> send(IN value);
    protected abstract Object[] extract(IN record);
}

Specific Sink Types

Individual sink implementations for different data types.

// Flink Tuple sink
public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
    public CassandraTupleSink(String insertQuery, ClusterBuilder builder);
    protected Object[] extract(IN record);
}

// Flink Row sink  
public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {
    public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder);
    protected Object[] extract(Row record);
}

// POJO sink with DataStax mapping
public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
    public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder);
    public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, MapperOptions options);
    public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspace);
    public void open(Configuration configuration);
    public ListenableFuture<ResultSet> send(IN value);
}

// Scala Product sink
public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
    public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder);
    protected Object[] extract(IN record);
}

Error Handling

All sinks support custom failure handlers for robust error handling:

// Custom failure handler example
CassandraFailureHandler customHandler = new CassandraFailureHandler() {
    @Override
    public void onFailure(Throwable failure) throws IOException {
        if (failure instanceof WriteTimeoutException) {
            // Log timeout but continue processing
            logger.warn("Write timeout, continuing", failure);
            return;
        }
        // Re-throw other exceptions to fail the sink
        throw new IOException("Cassandra write failed", failure);
    }
};

CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.data (id, value) VALUES (?, ?);")
    .setHost("127.0.0.1")
    .setFailureHandler(customHandler)
    .build();

Performance Tuning

Configure concurrency and resource management:

CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.data (id, value) VALUES (?, ?);")
    .setHost("127.0.0.1")
    .setMaxConcurrentRequests(100)  // Limit concurrent requests
    .setMaxConcurrentRequests(100, Duration.ofSeconds(30))  // With timeout
    .enableIgnoreNullFields()  // Avoid tombstones
    .build();

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra

docs

batch-connectors.md

configuration.md

index.md

streaming-sinks.md

table-api.md

write-ahead-logging.md

tile.json