Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases
—
Integration with Flink's Table API for declarative stream processing. Provides append-only table sinks with schema inference, SQL compatibility, and seamless integration with Flink's unified batch and streaming table processing.
Implementation of Flink's AppendStreamTableSink for writing table data to Cassandra.
public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
public CassandraAppendTableSink(ClusterBuilder builder, String cql);
public CassandraAppendTableSink(ClusterBuilder builder, String cql, Properties properties);
public TypeInformation<Row> getOutputType();
public String[] getFieldNames();
public TypeInformation<?>[] getFieldTypes();
public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream);
}Simple Table Sink:
import org.apache.flink.streaming.connectors.cassandra.CassandraAppendTableSink;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Set up table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create cluster builder
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
};
// Create and register Cassandra table sink
CassandraAppendTableSink cassandraSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
);
tableEnv.registerTableSink("cassandra_users", cassandraSink);With Configuration Properties:
import java.util.Properties;
// Additional configuration properties
Properties properties = new Properties();
properties.setProperty("cassandra.connection.timeout", "10000");
properties.setProperty("cassandra.read.timeout", "5000");
CassandraAppendTableSink configuredSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.orders (order_id, customer_id, total, created_at) VALUES (?, ?, ?, ?)",
properties
);
tableEnv.registerTableSink("cassandra_orders", configuredSink);Use SQL to write data to Cassandra:
// Create source table (from Kafka, file, etc.)
tableEnv.executeSql(
"CREATE TABLE source_users (" +
" id STRING," +
" name STRING," +
" age INT," +
" email STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'user_events'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// Register Cassandra sink
CassandraAppendTableSink cassandraSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
);
tableEnv.registerTableSink("cassandra_users", cassandraSink);
// SQL query to transform and sink data
tableEnv.executeSql(
"INSERT INTO cassandra_users " +
"SELECT id, UPPER(name) as name, age, email " +
"FROM source_users " +
"WHERE age >= 18"
);Configure field names and types explicitly:
// Define schema
String[] fieldNames = {"user_id", "full_name", "user_age", "email_address"};
TypeInformation<?>[] fieldTypes = {
Types.STRING,
Types.STRING,
Types.INT,
Types.STRING
};
// Configure sink with schema
CassandraAppendTableSink schemaConfiguredSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
).configure(fieldNames, fieldTypes);
tableEnv.registerTableSink("typed_cassandra_users", schemaConfiguredSink);Leverage Table API for complex transformations before writing to Cassandra:
import org.apache.flink.table.api.Table;
// Create source table with complex schema
tableEnv.executeSql(
"CREATE TABLE event_stream (" +
" event_id STRING," +
" user_id STRING," +
" event_type STRING," +
" event_time TIMESTAMP(3)," +
" properties MAP<STRING, STRING>," +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'events'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// Process data with windowing and aggregation
Table processedData = tableEnv.sqlQuery(
"SELECT " +
" user_id," +
" event_type," +
" COUNT(*) as event_count," +
" MAX(event_time) as last_event_time," +
" TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start " +
"FROM event_stream " +
"GROUP BY " +
" user_id, " +
" event_type, " +
" TUMBLE(event_time, INTERVAL '1' HOUR)"
);
// Register Cassandra sink for aggregated data
CassandraAppendTableSink aggregationSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.user_hourly_stats (user_id, event_type, event_count, last_event_time, window_start) VALUES (?, ?, ?, ?, ?)"
);
String[] aggFieldNames = {"user_id", "event_type", "event_count", "last_event_time", "window_start"};
TypeInformation<?>[] aggFieldTypes = {
Types.STRING,
Types.STRING,
Types.LONG,
Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP
};
tableEnv.registerTableSink("user_stats", aggregationSink.configure(aggFieldNames, aggFieldTypes));
// Insert processed data
processedData.executeInsert("user_stats");Register Cassandra tables in Flink's catalog system:
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.api.Schema;
// Create catalog
GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("cassandra_catalog");
tableEnv.registerCatalog("cassandra_catalog", catalog);
tableEnv.useCatalog("cassandra_catalog");
// Define table schema
Schema schema = Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("created_at", DataTypes.TIMESTAMP(3))
.build();
// Create table descriptor
Map<String, String> properties = new HashMap<>();
properties.put("connector", "cassandra");
properties.put("cassandra.hosts", "127.0.0.1");
properties.put("cassandra.keyspace", "example");
properties.put("cassandra.table", "users");
CatalogTable catalogTable = CatalogTableImpl.of(
schema,
"Cassandra users table",
new ArrayList<>(),
properties
);
// Register table in catalog
catalog.createTable(
new ObjectPath("default", "users"),
catalogTable,
false
);Handle changing schemas with dynamic table concepts:
// Source with evolving schema
tableEnv.executeSql(
"CREATE TABLE dynamic_events (" +
" event_id STRING," +
" event_data ROW<" +
" user_id STRING," +
" action STRING," +
" metadata MAP<STRING, STRING>" +
" >," +
" event_time TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'dynamic_events'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// Flatten and transform
Table flattenedEvents = tableEnv.sqlQuery(
"SELECT " +
" event_id," +
" event_data.user_id," +
" event_data.action," +
" event_data.metadata['source'] as source," +
" event_time " +
"FROM dynamic_events"
);
// Register sink for flattened data
CassandraAppendTableSink dynamicSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.flattened_events (event_id, user_id, action, source, event_time) VALUES (?, ?, ?, ?, ?)"
);
tableEnv.registerTableSink("flattened_events", dynamicSink);
flattenedEvents.executeInsert("flattened_events");Work with time-based data processing:
// Create temporal table with event time
tableEnv.executeSql(
"CREATE TABLE temporal_data (" +
" key_id STRING," +
" value_data STRING," +
" process_time AS PROCTIME()," +
" event_time TIMESTAMP(3)," +
" WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'temporal_stream'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// Temporal join example (requires reference table)
Table temporalResult = tableEnv.sqlQuery(
"SELECT " +
" t1.key_id," +
" t1.value_data," +
" t1.event_time " +
"FROM temporal_data t1 " +
"WHERE t1.event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR"
);
// Sink recent data to Cassandra
CassandraAppendTableSink temporalSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.recent_data (key_id, value_data, event_time) VALUES (?, ?, ?)"
);
tableEnv.registerTableSink("recent_data", temporalSink);
temporalResult.executeInsert("recent_data");Handle errors at the table level:
// Custom sink with error handling
public class ErrorHandlingCassandraAppendTableSink extends CassandraAppendTableSink {
private final CassandraFailureHandler failureHandler;
public ErrorHandlingCassandraAppendTableSink(
ClusterBuilder builder,
String cql,
CassandraFailureHandler failureHandler) {
super(builder, cql);
this.failureHandler = failureHandler;
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
// Create sink with custom failure handler
CassandraRowSink sink = new CassandraRowSink(
dataStream.getType().getArity(),
getCql(),
getBuilder(),
CassandraSinkBaseConfig.newBuilder().build(),
failureHandler
);
return dataStream.addSink(sink)
.setParallelism(dataStream.getParallelism())
.name("Cassandra Table Sink");
}
}Monitor table sink performance:
// Enable metrics collection
env.getConfig().setLatencyTrackingInterval(1000);
// Create sink with monitoring
CassandraAppendTableSink monitoredSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.monitored_data (id, value, timestamp) VALUES (?, ?, ?)"
) {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return super.consumeDataStream(dataStream)
.name("Monitored Cassandra Sink")
.setParallelism(4); // Explicit parallelism for monitoring
}
};
tableEnv.registerTableSink("monitored_sink", monitoredSink);The Cassandra table sink only supports append operations:
// Supported: INSERT operations
table.executeInsert("cassandra_sink"); // ✓
// NOT supported: UPDATE/DELETE operations
// Table API UPDATE/DELETE queries will fail with CassandraAppendTableSinkHandle schema changes carefully:
// Define flexible schema for evolution
String[] flexibleFieldNames = {"id", "data", "metadata", "timestamp"};
TypeInformation<?>[] flexibleFieldTypes = {
Types.STRING,
Types.STRING, // JSON string for flexible data
Types.MAP(Types.STRING, Types.STRING), // Key-value metadata
Types.SQL_TIMESTAMP
};
CassandraAppendTableSink flexibleSink = new CassandraAppendTableSink(
builder,
"INSERT INTO example.flexible_data (id, data, metadata, timestamp) VALUES (?, ?, ?, ?)"
).configure(flexibleFieldNames, flexibleFieldTypes);Table API adds processing overhead:
// For high-throughput scenarios, consider direct DataStream API
// Table API is better for complex transformations and SQL compatibility
// Direct DataStream (higher performance)
stream.addSink(cassandraDirectSink);
// Table API (more functionality, slightly lower performance)
Table table = tableEnv.fromDataStream(stream);
table.executeInsert("cassandra_table_sink");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra