CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-cassandra

Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases

Pending
Overview
Eval results
Files

batch-connectors.mddocs/

Batch Data Processing

Comprehensive batch input and output formats for reading from and writing to Cassandra in Flink batch processing jobs. Supports Tuples, Rows, and POJOs with configurable parallelism and connection management.

Capabilities

Input Formats

Base Input Format

Common base class for all Cassandra input formats providing connection management and split handling.

public abstract class CassandraInputFormatBase<OUT> extends RichInputFormat<OUT, InputSplit> {
    public CassandraInputFormatBase(String query, ClusterBuilder builder);
    public void configure(Configuration parameters);
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics);
    public InputSplit[] createInputSplits(int minNumSplits);
    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
    public void close();
}

Tuple Input Format

Reads data from Cassandra and generates Flink Tuples.

public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT> {
    public CassandraInputFormat(String query, ClusterBuilder builder);
    public void open(InputSplit ignored);
    public boolean reachedEnd();
    public OUT nextRecord(OUT reuse);
}

Usage Example:

import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import com.datastax.driver.core.Cluster;

// Create cluster builder
ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder.addContactPoint("127.0.0.1").build();
    }
};

// Create input format
CassandraInputFormat<Tuple3<String, Integer, String>> inputFormat = 
    new CassandraInputFormat<>(
        "SELECT word, count, description FROM example.words WHERE token(word) > ? AND token(word) <= ?",
        builder
    );

// Use in batch job
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<String, Integer, String>> dataSet = env.createInput(inputFormat);

POJO Input Format

Reads data from Cassandra and generates POJOs using DataStax object mapping.

public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT> {
    public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass);
    public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass, MapperOptions mapperOptions);
    public void open(InputSplit split);
    public boolean reachedEnd();
    public OUT nextRecord(OUT reuse);
}

Usage Example:

// Define POJO with Cassandra annotations
@Table(keyspace = "example", name = "users")
public class User {
    @PartitionKey
    private String id;
    
    @Column(name = "name")
    private String name;
    
    @Column(name = "age")
    private Integer age;
    
    // constructors, getters, setters...
}

// Create POJO input format
CassandraPojoInputFormat<User> pojoInputFormat = 
    new CassandraPojoInputFormat<>(
        "SELECT * FROM example.users WHERE age > ?",
        builder,
        User.class
    );

// Use with mapper options
MapperOptions options = new MapperOptions() {
    @Override
    public Mapper.Option[] getMapperOptions() {
        return new Mapper.Option[] {
            Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
        };
    }
};

CassandraPojoInputFormat<User> pojoWithOptions = 
    new CassandraPojoInputFormat<>(
        "SELECT * FROM example.users",
        builder,
        User.class,
        options
    );

DataSet<User> users = env.createInput(pojoWithOptions);

Output Formats

Base Output Format

Common base class for all Cassandra output formats providing connection management and batch writing.

public abstract class CassandraOutputFormatBase<OUT> extends RichOutputFormat<OUT> {
    public CassandraOutputFormatBase(String insertQuery, ClusterBuilder builder);
    public void configure(Configuration parameters);
    public void open(int taskNumber, int numTasks);
    public void writeRecord(OUT record);
    public void close();
    protected abstract Object[] extractFields(OUT record);
    protected void onWriteSuccess(ResultSet ignored);
    protected void onWriteFailure(Throwable t);
}

Tuple Output Format

Writes Flink Tuples to Cassandra using prepared statements.

public class CassandraTupleOutputFormat<OUT extends Tuple> extends CassandraOutputFormatBase<OUT> {
    public CassandraTupleOutputFormat(String insertQuery, ClusterBuilder builder);
    protected Object[] extractFields(OUT record);
}

Usage Example:

import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;

// Create output format
CassandraTupleOutputFormat<Tuple3<String, Integer, String>> outputFormat = 
    new CassandraTupleOutputFormat<>(
        "INSERT INTO example.words (word, count, description) VALUES (?, ?, ?)",
        builder
    );

// Use in batch job
DataSet<Tuple3<String, Integer, String>> results = // ... your data processing
results.output(outputFormat);

Row Output Format

Writes Flink Rows to Cassandra with schema-based field extraction.

public class CassandraRowOutputFormat extends CassandraOutputFormatBase<Row> {
    public CassandraRowOutputFormat(String insertQuery, ClusterBuilder builder);
    protected Object[] extractFields(Row record);
}

Usage Example:

import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
import org.apache.flink.types.Row;

// Create row output format
CassandraRowOutputFormat rowOutputFormat = 
    new CassandraRowOutputFormat(
        "INSERT INTO example.metrics (id, timestamp, value) VALUES (?, ?, ?)",
        builder
    );

DataSet<Row> metrics = // ... your row data
metrics.output(rowOutputFormat);

POJO Output Format

Writes POJOs to Cassandra using DataStax object mapping.

public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT> {
    public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass);
    public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass, MapperOptions mapperOptions);
    public void configure(Configuration parameters);
    public void open(int taskNumber, int numTasks);
    public void writeRecord(OUT record);
    public void close();
}

Usage Example:

import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;

// Create POJO output format
CassandraPojoOutputFormat<User> pojoOutputFormat = 
    new CassandraPojoOutputFormat<>(builder, User.class);

// With mapper options
MapperOptions writeOptions = new MapperOptions() {
    @Override
    public Mapper.Option[] getMapperOptions() {
        return new Mapper.Option[] {
            Mapper.Option.ttl(3600),  // 1 hour TTL
            Mapper.Option.timestamp(System.currentTimeMillis())
        };
    }
};

CassandraPojoOutputFormat<User> pojoWithOptions = 
    new CassandraPojoOutputFormat<>(builder, User.class, writeOptions);

DataSet<User> processedUsers = // ... your user processing
processedUsers.output(pojoWithOptions);

Advanced Usage Patterns

Parallel Processing with Input Splits

The input formats automatically handle parallelism by creating input splits:

// Configure parallelism
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);  // Use 4 parallel tasks

// Input format will create splits automatically
CassandraInputFormat<Tuple2<String, Integer>> inputFormat = 
    new CassandraInputFormat<>(
        "SELECT id, value FROM example.data WHERE token(id) > ? AND token(id) <= ?",
        builder
    );

DataSet<Tuple2<String, Integer>> parallelData = env.createInput(inputFormat);

Custom Connection Configuration

Use advanced cluster configuration for production deployments:

ClusterBuilder productionBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoints("cassandra-1", "cassandra-2", "cassandra-3")
            .withPort(9042)
            .withCredentials("username", "password")
            .withSocketOptions(new SocketOptions()
                .setConnectTimeoutMillis(10000)
                .setReadTimeoutMillis(10000))
            .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
            .withReconnectionPolicy(new ConstantReconnectionPolicy(1000))
            .build();
    }
};

Error Handling in Batch Jobs

Override callback methods for custom error handling:

CassandraTupleOutputFormat<Tuple2<String, Integer>> customOutputFormat = 
    new CassandraTupleOutputFormat<Tuple2<String, Integer>>(
        "INSERT INTO example.data (id, value) VALUES (?, ?)",
        builder
    ) {
        @Override
        protected void onWriteFailure(Throwable t) {
            // Log error and continue, or re-throw to fail the job
            logger.error("Failed to write record", t);
            // super.onWriteFailure(t); // Uncomment to fail on error
        }
        
        @Override
        protected void onWriteSuccess(ResultSet result) {
            // Custom success handling
            logger.debug("Successfully wrote record");
        }
    };

Memory Management

For large datasets, consider memory-efficient processing:

// Configure batch size and resource management
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();  // Reuse objects to reduce GC pressure

// Process data in smaller batches
DataSet<User> largeDataset = env.createInput(inputFormat);
largeDataset
    .rebalance()  // Distribute data evenly
    .output(outputFormat);

Deprecated Components

CassandraOutputFormat (Deprecated)

@Deprecated
public class CassandraOutputFormat<OUT extends Tuple> extends CassandraTupleOutputFormat<OUT> {
    public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
}

Note: Use CassandraTupleOutputFormat instead of the deprecated CassandraOutputFormat.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra

docs

batch-connectors.md

configuration.md

index.md

streaming-sinks.md

table-api.md

write-ahead-logging.md

tile.json