or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-connectors.mdconfiguration.mdindex.mdstreaming-sinks.mdtable-api.mdwrite-ahead-logging.md
tile.json

tessl/maven-org-apache-flink--flink-connector-cassandra

Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-cassandra_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-cassandra@1.14.0

index.mddocs/

Apache Flink Cassandra Connector

A comprehensive Apache Flink connector for Apache Cassandra that enables streaming applications to write data efficiently into Cassandra databases. The connector provides multiple sink implementations for different data types, supports both streaming and batch processing modes, offers configurable failure handling mechanisms, and integrates with Flink's checkpointing system for fault tolerance.

Package Information

  • Package Name: flink-connector-cassandra_2.11
  • Package Type: maven
  • Language: Java
  • Installation: Add to your Maven project:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-cassandra_2.11</artifactId>
      <version>1.14.6</version>
    </dependency>

Core Imports

For streaming applications:

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

For batch applications:

import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;

Configuration imports:

import org.apache.flink.streaming.connectors.cassandra.CassandraFailureHandler;
import org.apache.flink.streaming.connectors.cassandra.MapperOptions;

Basic Usage

Simple Streaming Sink

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.api.java.tuple.Tuple3;
import com.datastax.driver.core.Cluster;

// Create a simple ClusterBuilder
ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder.addContactPoint("127.0.0.1").build();
    }
};

// Configure and add sink to DataStream
DataStream<Tuple3<String, Integer, String>> stream = // ... your data stream

CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.words (word, count, description) VALUES (?, ?, ?);")
    .setHost("127.0.0.1")
    .build();

POJO Sink with Mapper

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import com.datastax.driver.mapping.annotations.Table;
import com.datastax.driver.mapping.annotations.Column;

// Define a POJO with Cassandra annotations
@Table(keyspace = "example", name = "users")
public class User {
    @Column(name = "id")
    private String id;
    
    @Column(name = "name")
    private String name;
    
    @Column(name = "age")
    private Integer age;
    
    // constructors, getters, setters...
}

// Use the POJO sink
DataStream<User> userStream = // ... your user stream

CassandraSink.addSink(userStream)
    .setDefaultKeyspace("example")
    .setHost("127.0.0.1")
    .build();

Exactly-Once Processing with Write-Ahead Log

CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
    .setHost("127.0.0.1")
    .enableWriteAheadLog()  // Enable exactly-once processing
    .build();

Architecture

The Apache Flink Cassandra Connector is built around several key architectural components:

  • Sink Hierarchy: Base CassandraSinkBase with type-specific implementations for different data formats
  • Builder Pattern: Fluent CassandraSinkBuilder API for configuration with automatic type detection
  • DataStax Driver Integration: Built on DataStax Java Driver 3.0.0 with shaded dependencies
  • Fault Tolerance: Integration with Flink's checkpointing and write-ahead logging for exactly-once guarantees
  • Type System: Support for Flink Tuples, Rows, POJOs, and Scala Products with automatic serialization
  • Configuration Management: Centralized configuration through CassandraSinkBaseConfig and builder patterns

Capabilities

Streaming Data Sinks

Primary streaming sink functionality supporting multiple data types including Tuples, Rows, POJOs, and Scala Products. Provides builder-based configuration with automatic type detection and comprehensive failure handling.

public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);

public abstract static class CassandraSinkBuilder<IN> {
    public CassandraSinkBuilder<IN> setQuery(String query);
    public CassandraSinkBuilder<IN> setHost(String host);
    public CassandraSinkBuilder<IN> setHost(String host, int port);
    public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
    public CassandraSinkBuilder<IN> enableWriteAheadLog();
    public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler);
    public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests);
    public CassandraSinkBuilder<IN> enableIgnoreNullFields();
    public CassandraSink<IN> build();
}

Streaming Sinks

Batch Data Processing

Batch input and output formats for reading from and writing to Cassandra in batch processing jobs. Supports Tuples, Rows, and POJOs with configurable parallelism and connection management.

public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT>;
public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT>;
public class CassandraTupleOutputFormat<OUT extends Tuple> extends CassandraOutputFormatBase<OUT>;
public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT>;

Batch Connectors

Configuration and Connection Management

Connection builders, failure handlers, and configuration objects for customizing Cassandra connectivity, error handling, and performance tuning.

public abstract class ClusterBuilder {
    protected abstract Cluster buildCluster(Cluster.Builder builder);
}

public interface CassandraFailureHandler {
    void onFailure(Throwable failure) throws IOException;
}

public interface MapperOptions {
    Mapper.Option[] getMapperOptions();
}

Configuration

Write-Ahead Logging

Exactly-once processing guarantees through write-ahead logging with checkpoint integration. Stores records in Flink's state backend and commits them to Cassandra only on successful checkpoint completion.

public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN>;
public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row>;
public class CassandraCommitter extends CheckpointCommitter;

Write-Ahead Logging

Table API Integration

Integration with Flink's Table API for declarative stream processing. Provides append-only table sinks with schema inference and SQL compatibility.

public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
    public CassandraAppendTableSink(ClusterBuilder builder, String cql);
    public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream);
}

Table API

Common Data Types

// Connection configuration
public abstract class ClusterBuilder implements Serializable {
    protected abstract Cluster buildCluster(Cluster.Builder builder);
}

// Failure handling
public interface CassandraFailureHandler extends Serializable {
    void onFailure(Throwable failure) throws IOException;
}

// Configuration management
public final class CassandraSinkBaseConfig {
    public int getMaxConcurrentRequests();
    public Duration getMaxConcurrentRequestsTimeout();
    public boolean getIgnoreNullFields();
    
    public static Builder newBuilder();
}

// Mapper configuration for POJOs
public interface MapperOptions extends Serializable {
    Mapper.Option[] getMapperOptions();
}

// Checkpoint management for exactly-once processing
public class CassandraCommitter extends CheckpointCommitter {
    public CassandraCommitter(ClusterBuilder builder);
    public void commitCheckpoint(int subtaskIdx, long checkpointId);
    public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
}