Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases
—
Comprehensive batch input and output formats for reading from and writing to Cassandra in Flink batch processing jobs. Supports Tuples, Rows, and POJOs with configurable parallelism and connection management.
Common base class for all Cassandra input formats providing connection management and split handling.
public abstract class CassandraInputFormatBase<OUT> extends RichInputFormat<OUT, InputSplit> {
public CassandraInputFormatBase(String query, ClusterBuilder builder);
public void configure(Configuration parameters);
public BaseStatistics getStatistics(BaseStatistics cachedStatistics);
public InputSplit[] createInputSplits(int minNumSplits);
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
public void close();
}Reads data from Cassandra and generates Flink Tuples.
public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT> {
public CassandraInputFormat(String query, ClusterBuilder builder);
public void open(InputSplit ignored);
public boolean reachedEnd();
public OUT nextRecord(OUT reuse);
}Usage Example:
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import com.datastax.driver.core.Cluster;
// Create cluster builder
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
};
// Create input format
CassandraInputFormat<Tuple3<String, Integer, String>> inputFormat =
new CassandraInputFormat<>(
"SELECT word, count, description FROM example.words WHERE token(word) > ? AND token(word) <= ?",
builder
);
// Use in batch job
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<String, Integer, String>> dataSet = env.createInput(inputFormat);Reads data from Cassandra and generates POJOs using DataStax object mapping.
public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT> {
public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass);
public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass, MapperOptions mapperOptions);
public void open(InputSplit split);
public boolean reachedEnd();
public OUT nextRecord(OUT reuse);
}Usage Example:
// Define POJO with Cassandra annotations
@Table(keyspace = "example", name = "users")
public class User {
@PartitionKey
private String id;
@Column(name = "name")
private String name;
@Column(name = "age")
private Integer age;
// constructors, getters, setters...
}
// Create POJO input format
CassandraPojoInputFormat<User> pojoInputFormat =
new CassandraPojoInputFormat<>(
"SELECT * FROM example.users WHERE age > ?",
builder,
User.class
);
// Use with mapper options
MapperOptions options = new MapperOptions() {
@Override
public Mapper.Option[] getMapperOptions() {
return new Mapper.Option[] {
Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
};
}
};
CassandraPojoInputFormat<User> pojoWithOptions =
new CassandraPojoInputFormat<>(
"SELECT * FROM example.users",
builder,
User.class,
options
);
DataSet<User> users = env.createInput(pojoWithOptions);Common base class for all Cassandra output formats providing connection management and batch writing.
public abstract class CassandraOutputFormatBase<OUT> extends RichOutputFormat<OUT> {
public CassandraOutputFormatBase(String insertQuery, ClusterBuilder builder);
public void configure(Configuration parameters);
public void open(int taskNumber, int numTasks);
public void writeRecord(OUT record);
public void close();
protected abstract Object[] extractFields(OUT record);
protected void onWriteSuccess(ResultSet ignored);
protected void onWriteFailure(Throwable t);
}Writes Flink Tuples to Cassandra using prepared statements.
public class CassandraTupleOutputFormat<OUT extends Tuple> extends CassandraOutputFormatBase<OUT> {
public CassandraTupleOutputFormat(String insertQuery, ClusterBuilder builder);
protected Object[] extractFields(OUT record);
}Usage Example:
import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
// Create output format
CassandraTupleOutputFormat<Tuple3<String, Integer, String>> outputFormat =
new CassandraTupleOutputFormat<>(
"INSERT INTO example.words (word, count, description) VALUES (?, ?, ?)",
builder
);
// Use in batch job
DataSet<Tuple3<String, Integer, String>> results = // ... your data processing
results.output(outputFormat);Writes Flink Rows to Cassandra with schema-based field extraction.
public class CassandraRowOutputFormat extends CassandraOutputFormatBase<Row> {
public CassandraRowOutputFormat(String insertQuery, ClusterBuilder builder);
protected Object[] extractFields(Row record);
}Usage Example:
import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
import org.apache.flink.types.Row;
// Create row output format
CassandraRowOutputFormat rowOutputFormat =
new CassandraRowOutputFormat(
"INSERT INTO example.metrics (id, timestamp, value) VALUES (?, ?, ?)",
builder
);
DataSet<Row> metrics = // ... your row data
metrics.output(rowOutputFormat);Writes POJOs to Cassandra using DataStax object mapping.
public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT> {
public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass);
public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass, MapperOptions mapperOptions);
public void configure(Configuration parameters);
public void open(int taskNumber, int numTasks);
public void writeRecord(OUT record);
public void close();
}Usage Example:
import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
// Create POJO output format
CassandraPojoOutputFormat<User> pojoOutputFormat =
new CassandraPojoOutputFormat<>(builder, User.class);
// With mapper options
MapperOptions writeOptions = new MapperOptions() {
@Override
public Mapper.Option[] getMapperOptions() {
return new Mapper.Option[] {
Mapper.Option.ttl(3600), // 1 hour TTL
Mapper.Option.timestamp(System.currentTimeMillis())
};
}
};
CassandraPojoOutputFormat<User> pojoWithOptions =
new CassandraPojoOutputFormat<>(builder, User.class, writeOptions);
DataSet<User> processedUsers = // ... your user processing
processedUsers.output(pojoWithOptions);The input formats automatically handle parallelism by creating input splits:
// Configure parallelism
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // Use 4 parallel tasks
// Input format will create splits automatically
CassandraInputFormat<Tuple2<String, Integer>> inputFormat =
new CassandraInputFormat<>(
"SELECT id, value FROM example.data WHERE token(id) > ? AND token(id) <= ?",
builder
);
DataSet<Tuple2<String, Integer>> parallelData = env.createInput(inputFormat);Use advanced cluster configuration for production deployments:
ClusterBuilder productionBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoints("cassandra-1", "cassandra-2", "cassandra-3")
.withPort(9042)
.withCredentials("username", "password")
.withSocketOptions(new SocketOptions()
.setConnectTimeoutMillis(10000)
.setReadTimeoutMillis(10000))
.withRetryPolicy(DefaultRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(1000))
.build();
}
};Override callback methods for custom error handling:
CassandraTupleOutputFormat<Tuple2<String, Integer>> customOutputFormat =
new CassandraTupleOutputFormat<Tuple2<String, Integer>>(
"INSERT INTO example.data (id, value) VALUES (?, ?)",
builder
) {
@Override
protected void onWriteFailure(Throwable t) {
// Log error and continue, or re-throw to fail the job
logger.error("Failed to write record", t);
// super.onWriteFailure(t); // Uncomment to fail on error
}
@Override
protected void onWriteSuccess(ResultSet result) {
// Custom success handling
logger.debug("Successfully wrote record");
}
};For large datasets, consider memory-efficient processing:
// Configure batch size and resource management
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse(); // Reuse objects to reduce GC pressure
// Process data in smaller batches
DataSet<User> largeDataset = env.createInput(inputFormat);
largeDataset
.rebalance() // Distribute data evenly
.output(outputFormat);@Deprecated
public class CassandraOutputFormat<OUT extends Tuple> extends CassandraTupleOutputFormat<OUT> {
public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
}Note: Use CassandraTupleOutputFormat instead of the deprecated CassandraOutputFormat.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra