Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities
—
Input and output formats for batch processing jobs using Apache Flink's DataSet API. These implementations provide efficient reading from and writing to Cassandra databases in batch processing scenarios.
Input format for reading tuple data from Cassandra in batch processing jobs.
/**
* InputFormat for reading tuple data from Cassandra using CQL queries
* Implements NonParallelInput - runs as single parallel instance
*/
public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
/**
* Creates input format with CQL SELECT query and cluster configuration
* @param query CQL SELECT statement for data retrieval
* @param builder ClusterBuilder for connection configuration
* @throws IllegalArgumentException if query is null/empty or builder is null
*/
public CassandraInputFormat(String query, ClusterBuilder builder);
/**
* Configures cluster connection (called by Flink framework)
* @param parameters configuration parameters
*/
@Override
public void configure(Configuration parameters);
/**
* Returns cached statistics for query optimization
* @param cachedStatistics previously cached statistics
* @return cached statistics (no new statistics computed)
*/
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
/**
* Opens session and executes query to initialize result set
* @param ignored input split (ignored for non-parallel input)
* @throws IOException if connection or query execution fails
*/
@Override
public void open(InputSplit ignored) throws IOException;
/**
* Checks if all results have been consumed
* @return true if result set is exhausted
*/
@Override
public boolean reachedEnd() throws IOException;
/**
* Returns next record from result set, populating reusable tuple
* @param reuse tuple instance to populate with data
* @return populated tuple with next record data
* @throws IOException if record retrieval fails
*/
@Override
public OUT nextRecord(OUT reuse) throws IOException;
/**
* Creates single input split (non-parallel processing)
* @param minNumSplits minimum splits requested (ignored)
* @return array with single GenericInputSplit
*/
@Override
public InputSplit[] createInputSplits(int minNumSplits) throws IOException;
/**
* Returns default input split assigner
* @param inputSplits array of input splits
* @return DefaultInputSplitAssigner for sequential processing
*/
@Override
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
/**
* Closes session and cluster connections
* @throws IOException if connection cleanup fails
*/
@Override
public void close() throws IOException;
}Usage Examples:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import com.datastax.driver.core.Cluster;
// Create cluster builder
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra.example.com")
.withPort(9042)
.build();
}
};
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read user data from Cassandra
CassandraInputFormat<Tuple3<String, Integer, String>> inputFormat =
new CassandraInputFormat<>(
"SELECT name, age, email FROM users WHERE age > 18",
builder
);
DataSet<Tuple3<String, Integer, String>> users = env.createInput(inputFormat);
// Process the data
DataSet<Tuple3<String, Integer, String>> processedUsers = users
.filter(user -> user.f1 < 65) // age < 65
.map(user -> new Tuple3<>(user.f0.toUpperCase(), user.f1, user.f2));
processedUsers.print();Output format for writing tuple data to Cassandra in batch processing jobs.
/**
* OutputFormat for writing tuple data to Cassandra using CQL INSERT statements
*/
public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
/**
* Creates output format with CQL INSERT query and cluster configuration
* @param insertQuery CQL INSERT statement with parameter placeholders
* @param builder ClusterBuilder for connection configuration
* @throws IllegalArgumentException if insertQuery is null/empty or builder is null
*/
public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
/**
* Configures cluster connection (called by Flink framework)
* @param parameters configuration parameters
*/
@Override
public void configure(Configuration parameters);
/**
* Opens session and prepares INSERT statement
* @param taskNumber parallel instance number
* @param numTasks total number of parallel instances
* @throws IOException if connection setup or statement preparation fails
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException;
/**
* Writes single record to Cassandra using prepared statement
* @param record tuple record to write
* @throws IOException if write operation fails
*/
@Override
public void writeRecord(OUT record) throws IOException;
/**
* Closes session and cluster connections
* @throws IOException if connection cleanup fails
*/
@Override
public void close() throws IOException;
}Usage Examples:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.api.java.tuple.Tuple4;
// Process and transform data
DataSet<Tuple4<String, Integer, Double, String>> processedOrders = // your processed data
// Write results to Cassandra
CassandraOutputFormat<Tuple4<String, Integer, Double, String>> outputFormat =
new CassandraOutputFormat<>(
"INSERT INTO analytics.order_summary (order_id, item_count, total_value, status) VALUES (?, ?, ?, ?)",
builder
);
processedOrders.output(outputFormat);
env.execute("Batch Processing Job");// Complete ETL pipeline using batch processing
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Extract: Read raw data from Cassandra
CassandraInputFormat<Tuple3<String, String, Long>> rawDataFormat =
new CassandraInputFormat<>(
"SELECT user_id, event_type, timestamp FROM events WHERE date = '2023-01-01'",
sourceBuilder
);
DataSet<Tuple3<String, String, Long>> rawEvents = env.createInput(rawDataFormat);
// Transform: Aggregate and clean data
DataSet<Tuple3<String, Integer, Double>> aggregatedData = rawEvents
.groupBy(0) // group by user_id
.reduceGroup(new RichGroupReduceFunction<Tuple3<String, String, Long>, Tuple3<String, Integer, Double>>() {
@Override
public void reduce(Iterable<Tuple3<String, String, Long>> events,
Collector<Tuple3<String, Integer, Double>> out) throws Exception {
String userId = null;
int eventCount = 0;
double avgTimestamp = 0.0;
long totalTimestamp = 0;
for (Tuple3<String, String, Long> event : events) {
userId = event.f0;
eventCount++;
totalTimestamp += event.f2;
}
if (eventCount > 0) {
avgTimestamp = (double) totalTimestamp / eventCount;
out.collect(new Tuple3<>(userId, eventCount, avgTimestamp));
}
}
});
// Load: Write aggregated results to Cassandra
CassandraOutputFormat<Tuple3<String, Integer, Double>> targetFormat =
new CassandraOutputFormat<>(
"INSERT INTO analytics.user_daily_stats (user_id, event_count, avg_timestamp) VALUES (?, ?, ?)",
targetBuilder
);
aggregatedData.output(targetFormat);
env.execute("Daily Analytics ETL");// Migrate data between different Cassandra clusters or keyspaces
CassandraInputFormat<Tuple5<String, String, Integer, Boolean, Long>> sourceFormat =
new CassandraInputFormat<>(
"SELECT id, name, age, active, created_at FROM legacy.users",
sourceClusterBuilder
);
CassandraOutputFormat<Tuple5<String, String, Integer, Boolean, Long>> targetFormat =
new CassandraOutputFormat<>(
"INSERT INTO new_schema.user_profiles (user_id, full_name, age, is_active, registration_date) VALUES (?, ?, ?, ?, ?)",
targetClusterBuilder
);
DataSet<Tuple5<String, String, Integer, Boolean, Long>> userData = env.createInput(sourceFormat);
// Apply any transformations needed during migration
DataSet<Tuple5<String, String, Integer, Boolean, Long>> migratedData = userData
.filter(user -> user.f3) // only active users
.map(user -> {
// Transform data format if needed
return new Tuple5<>(
"user_" + user.f0, // prefix user ID
user.f1.trim(), // clean name
user.f2, // age unchanged
user.f3, // active status
user.f4 // timestamp unchanged
);
});
migratedData.output(targetFormat);
env.execute("Data Migration Job");Both input and output formats provide synchronous error handling:
IOException during open() if cluster connection failsIOException during open() if SELECT query is invalidIOException during writeRecord() if INSERT failsBoth formats properly manage Cassandra connections:
open() methodclose() methodInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra-2-10