CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities

Pending
Overview
Eval results
Files

batch-processing.mddocs/

Batch Data Processing

Input and output formats for batch processing jobs using Apache Flink's DataSet API. These implementations provide efficient reading from and writing to Cassandra databases in batch processing scenarios.

Capabilities

CassandraInputFormat

Input format for reading tuple data from Cassandra in batch processing jobs.

/**
 * InputFormat for reading tuple data from Cassandra using CQL queries
 * Implements NonParallelInput - runs as single parallel instance
 */
public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
    /**
     * Creates input format with CQL SELECT query and cluster configuration
     * @param query CQL SELECT statement for data retrieval
     * @param builder ClusterBuilder for connection configuration
     * @throws IllegalArgumentException if query is null/empty or builder is null
     */
    public CassandraInputFormat(String query, ClusterBuilder builder);
    
    /**
     * Configures cluster connection (called by Flink framework)
     * @param parameters configuration parameters
     */
    @Override
    public void configure(Configuration parameters);
    
    /**
     * Returns cached statistics for query optimization
     * @param cachedStatistics previously cached statistics
     * @return cached statistics (no new statistics computed)
     */
    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
    
    /**
     * Opens session and executes query to initialize result set
     * @param ignored input split (ignored for non-parallel input)
     * @throws IOException if connection or query execution fails
     */
    @Override
    public void open(InputSplit ignored) throws IOException;
    
    /**
     * Checks if all results have been consumed
     * @return true if result set is exhausted
     */
    @Override
    public boolean reachedEnd() throws IOException;
    
    /**
     * Returns next record from result set, populating reusable tuple
     * @param reuse tuple instance to populate with data
     * @return populated tuple with next record data
     * @throws IOException if record retrieval fails
     */
    @Override
    public OUT nextRecord(OUT reuse) throws IOException;
    
    /**
     * Creates single input split (non-parallel processing)
     * @param minNumSplits minimum splits requested (ignored)
     * @return array with single GenericInputSplit
     */
    @Override
    public InputSplit[] createInputSplits(int minNumSplits) throws IOException;
    
    /**
     * Returns default input split assigner
     * @param inputSplits array of input splits
     * @return DefaultInputSplitAssigner for sequential processing
     */
    @Override
    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
    
    /**
     * Closes session and cluster connections
     * @throws IOException if connection cleanup fails
     */
    @Override
    public void close() throws IOException;
}

Usage Examples:

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import com.datastax.driver.core.Cluster;

// Create cluster builder
ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoint("cassandra.example.com")
            .withPort(9042)
            .build();
    }
};

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Read user data from Cassandra
CassandraInputFormat<Tuple3<String, Integer, String>> inputFormat = 
    new CassandraInputFormat<>(
        "SELECT name, age, email FROM users WHERE age > 18", 
        builder
    );

DataSet<Tuple3<String, Integer, String>> users = env.createInput(inputFormat);

// Process the data
DataSet<Tuple3<String, Integer, String>> processedUsers = users
    .filter(user -> user.f1 < 65) // age < 65
    .map(user -> new Tuple3<>(user.f0.toUpperCase(), user.f1, user.f2));

processedUsers.print();

CassandraOutputFormat

Output format for writing tuple data to Cassandra in batch processing jobs.

/**
 * OutputFormat for writing tuple data to Cassandra using CQL INSERT statements
 */
public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
    /**
     * Creates output format with CQL INSERT query and cluster configuration
     * @param insertQuery CQL INSERT statement with parameter placeholders
     * @param builder ClusterBuilder for connection configuration
     * @throws IllegalArgumentException if insertQuery is null/empty or builder is null
     */
    public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
    
    /**
     * Configures cluster connection (called by Flink framework)
     * @param parameters configuration parameters
     */
    @Override
    public void configure(Configuration parameters);
    
    /**
     * Opens session and prepares INSERT statement
     * @param taskNumber parallel instance number
     * @param numTasks total number of parallel instances
     * @throws IOException if connection setup or statement preparation fails
     */
    @Override
    public void open(int taskNumber, int numTasks) throws IOException;
    
    /**
     * Writes single record to Cassandra using prepared statement
     * @param record tuple record to write
     * @throws IOException if write operation fails
     */
    @Override
    public void writeRecord(OUT record) throws IOException;
    
    /**
     * Closes session and cluster connections
     * @throws IOException if connection cleanup fails
     */
    @Override
    public void close() throws IOException;
}

Usage Examples:

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.api.java.tuple.Tuple4;

// Process and transform data
DataSet<Tuple4<String, Integer, Double, String>> processedOrders = // your processed data

// Write results to Cassandra
CassandraOutputFormat<Tuple4<String, Integer, Double, String>> outputFormat = 
    new CassandraOutputFormat<>(
        "INSERT INTO analytics.order_summary (order_id, item_count, total_value, status) VALUES (?, ?, ?, ?)",
        builder
    );

processedOrders.output(outputFormat);

env.execute("Batch Processing Job");

Batch Processing Patterns

ETL Pipeline Example

// Complete ETL pipeline using batch processing
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Extract: Read raw data from Cassandra
CassandraInputFormat<Tuple3<String, String, Long>> rawDataFormat = 
    new CassandraInputFormat<>(
        "SELECT user_id, event_type, timestamp FROM events WHERE date = '2023-01-01'", 
        sourceBuilder
    );
DataSet<Tuple3<String, String, Long>> rawEvents = env.createInput(rawDataFormat);

// Transform: Aggregate and clean data
DataSet<Tuple3<String, Integer, Double>> aggregatedData = rawEvents
    .groupBy(0) // group by user_id
    .reduceGroup(new RichGroupReduceFunction<Tuple3<String, String, Long>, Tuple3<String, Integer, Double>>() {
        @Override
        public void reduce(Iterable<Tuple3<String, String, Long>> events, 
                          Collector<Tuple3<String, Integer, Double>> out) throws Exception {
            String userId = null;
            int eventCount = 0;
            double avgTimestamp = 0.0;
            long totalTimestamp = 0;
            
            for (Tuple3<String, String, Long> event : events) {
                userId = event.f0;
                eventCount++;
                totalTimestamp += event.f2;
            }
            
            if (eventCount > 0) {
                avgTimestamp = (double) totalTimestamp / eventCount;
                out.collect(new Tuple3<>(userId, eventCount, avgTimestamp));
            }
        }
    });

// Load: Write aggregated results to Cassandra
CassandraOutputFormat<Tuple3<String, Integer, Double>> targetFormat = 
    new CassandraOutputFormat<>(
        "INSERT INTO analytics.user_daily_stats (user_id, event_count, avg_timestamp) VALUES (?, ?, ?)",
        targetBuilder
    );

aggregatedData.output(targetFormat);
env.execute("Daily Analytics ETL");

Data Migration Example

// Migrate data between different Cassandra clusters or keyspaces
CassandraInputFormat<Tuple5<String, String, Integer, Boolean, Long>> sourceFormat = 
    new CassandraInputFormat<>(
        "SELECT id, name, age, active, created_at FROM legacy.users", 
        sourceClusterBuilder
    );

CassandraOutputFormat<Tuple5<String, String, Integer, Boolean, Long>> targetFormat = 
    new CassandraOutputFormat<>(
        "INSERT INTO new_schema.user_profiles (user_id, full_name, age, is_active, registration_date) VALUES (?, ?, ?, ?, ?)",
        targetClusterBuilder
    );

DataSet<Tuple5<String, String, Integer, Boolean, Long>> userData = env.createInput(sourceFormat);

// Apply any transformations needed during migration
DataSet<Tuple5<String, String, Integer, Boolean, Long>> migratedData = userData
    .filter(user -> user.f3) // only active users
    .map(user -> {
        // Transform data format if needed
        return new Tuple5<>(
            "user_" + user.f0, // prefix user ID
            user.f1.trim(),    // clean name
            user.f2,           // age unchanged
            user.f3,           // active status
            user.f4            // timestamp unchanged
        );
    });

migratedData.output(targetFormat);
env.execute("Data Migration Job");

Configuration Notes

Performance Considerations

  • CassandraInputFormat: Runs as non-parallel input by design to avoid overwhelming Cassandra with concurrent queries. For large datasets, consider partitioning queries by date ranges or other criteria.
  • CassandraOutputFormat: Supports parallelism and can write concurrently to Cassandra. Monitor cluster performance and adjust parallelism accordingly.
  • Query Optimization: Use appropriate WHERE clauses and LIMIT statements in input queries to control data volume.

Error Handling

Both input and output formats provide synchronous error handling:

  • Connection Errors: Throw IOException during open() if cluster connection fails
  • Query Errors: Input format throws IOException during open() if SELECT query is invalid
  • Write Errors: Output format throws IOException during writeRecord() if INSERT fails
  • Schema Errors: Runtime errors if tuple arity doesn't match query parameter count

Resource Management

Both formats properly manage Cassandra connections:

  • Open connections during open() method
  • Close sessions and clusters during close() method
  • Handle exceptions during cleanup to prevent resource leaks

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

docs

batch-processing.md

connection-configuration.md

fault-tolerance.md

index.md

streaming-sinks.md

tile.json