or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconnection-configuration.mdfault-tolerance.mdindex.mdstreaming-sinks.md
tile.json

tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-cassandra_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-cassandra-2-10@1.3.0

index.mddocs/

Apache Flink Cassandra Connector

Apache Flink Cassandra Connector provides comprehensive integration between Apache Flink and Apache Cassandra databases. It supports both streaming (DataStream API) and batch (DataSet API) processing with multiple sink implementations for different data types and processing guarantees, including exactly-once semantics through write-ahead logging.

Package Information

  • Package Name: flink-connector-cassandra_2.10
  • Package Type: maven
  • Language: Java
  • Installation: Add dependency to your Maven project:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-cassandra_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Core Imports

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase;
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;

Basic Usage

Streaming Sink Example

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.api.java.tuple.Tuple3;
import com.datastax.driver.core.Cluster;

// Define a ClusterBuilder for connection configuration
ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder.addContactPoint("127.0.0.1").withPort(9042).build();
    }
};

// Create a tuple-based sink
DataStream<Tuple3<String, Integer, String>> stream = // your data stream
CassandraSink<Tuple3<String, Integer, String>> sink = CassandraSink
    .addSink(stream)
    .setQuery("INSERT INTO example.users (name, age, email) VALUES (?, ?, ?);")
    .setHost("localhost", 9042)
    .build();

sink.name("Cassandra Sink");

Batch Processing Example

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Read from Cassandra
CassandraInputFormat<Tuple2<String, Integer>> inputFormat = 
    new CassandraInputFormat<>("SELECT name, age FROM example.users", builder);
DataSet<Tuple2<String, Integer>> input = env.createInput(inputFormat);

// Write to Cassandra
CassandraOutputFormat<Tuple2<String, String>> outputFormat = 
    new CassandraOutputFormat<>("INSERT INTO example.processed (name, status) VALUES (?, ?)", builder);
result.output(outputFormat);

Architecture

The Flink Cassandra Connector is organized into two main packages with distinct responsibilities:

  • Streaming Connectors (org.apache.flink.streaming.connectors.cassandra): DataStream API integration with multiple sink implementations
  • Batch Connectors (org.apache.flink.batch.connectors.cassandra): DataSet API integration for batch processing
  • Configuration Management: ClusterBuilder abstract class for Cassandra cluster connection setup
  • Fault Tolerance: Write-ahead logging support for exactly-once processing guarantees

Capabilities

Streaming Data Sinks

Comprehensive sink implementations for streaming data integration with different data types and processing guarantees.

public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);

public abstract static class CassandraSinkBuilder<IN> {
    public CassandraSinkBuilder<IN> setQuery(String query);
    public CassandraSinkBuilder<IN> setHost(String host);
    public CassandraSinkBuilder<IN> setHost(String host, int port);
    public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
    public CassandraSinkBuilder<IN> enableWriteAheadLog();
    public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
    public abstract CassandraSink<IN> build();
}

Streaming Sinks

Batch Data Processing

Input and output formats for batch processing jobs using the DataSet API.

public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> {
    public CassandraInputFormat(String query, ClusterBuilder builder);
}

public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
    public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
}

Batch Processing

Connection Configuration

Abstract configuration system for customizing Cassandra cluster connections with support for authentication, SSL, and advanced connection parameters.

public abstract class ClusterBuilder implements Serializable {
    public Cluster getCluster();
    protected abstract Cluster buildCluster(Cluster.Builder builder);
}

Connection Configuration

Fault Tolerance & Write-Ahead Logging

Exactly-once processing guarantees through checkpoint coordination and write-ahead logging for streaming applications.

public class CassandraCommitter extends CheckpointCommitter {
    public CassandraCommitter(ClusterBuilder builder);
    public CassandraCommitter(ClusterBuilder builder, String keySpace);
}

Fault Tolerance

Types

Core Types

// Main sink wrapper class
public class CassandraSink<IN> {
    // Sink configuration and lifecycle methods
    public CassandraSink<IN> name(String name);
    public CassandraSink<IN> uid(String uid);
    public CassandraSink<IN> setUidHash(String uidHash);
    public CassandraSink<IN> setParallelism(int parallelism);
    public CassandraSink<IN> disableChaining();
    public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
}

// Tuple-specific sink builder
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
    // Specialized for tuple-based data with CQL queries
}

// POJO-specific sink builder  
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
    // Specialized for POJO-based data with DataStax mapping annotations
}