CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-cassandra

Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases

Pending
Overview
Eval results
Files

table-api.mddocs/

Table API Integration

Integration with Flink's Table API for declarative stream processing. Provides append-only table sinks with schema inference, SQL compatibility, and seamless integration with Flink's unified batch and streaming table processing.

Capabilities

Append Table Sink

Implementation of Flink's AppendStreamTableSink for writing table data to Cassandra.

public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
    public CassandraAppendTableSink(ClusterBuilder builder, String cql);
    public CassandraAppendTableSink(ClusterBuilder builder, String cql, Properties properties);
    
    public TypeInformation<Row> getOutputType();
    public String[] getFieldNames();
    public TypeInformation<?>[] getFieldTypes();
    public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream);
}

Basic Usage

Simple Table Sink:

import org.apache.flink.streaming.connectors.cassandra.CassandraAppendTableSink;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Set up table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create cluster builder
ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder.addContactPoint("127.0.0.1").build();
    }
};

// Create and register Cassandra table sink
CassandraAppendTableSink cassandraSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
);

tableEnv.registerTableSink("cassandra_users", cassandraSink);

With Configuration Properties:

import java.util.Properties;

// Additional configuration properties
Properties properties = new Properties();
properties.setProperty("cassandra.connection.timeout", "10000");
properties.setProperty("cassandra.read.timeout", "5000");

CassandraAppendTableSink configuredSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.orders (order_id, customer_id, total, created_at) VALUES (?, ?, ?, ?)",
    properties
);

tableEnv.registerTableSink("cassandra_orders", configuredSink);

SQL Integration

Use SQL to write data to Cassandra:

// Create source table (from Kafka, file, etc.)
tableEnv.executeSql(
    "CREATE TABLE source_users (" +
    "  id STRING," +
    "  name STRING," +
    "  age INT," +
    "  email STRING" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user_events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// Register Cassandra sink
CassandraAppendTableSink cassandraSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
);
tableEnv.registerTableSink("cassandra_users", cassandraSink);

// SQL query to transform and sink data
tableEnv.executeSql(
    "INSERT INTO cassandra_users " +
    "SELECT id, UPPER(name) as name, age, email " +
    "FROM source_users " +
    "WHERE age >= 18"
);

Schema Configuration

Configure field names and types explicitly:

// Define schema
String[] fieldNames = {"user_id", "full_name", "user_age", "email_address"};
TypeInformation<?>[] fieldTypes = {
    Types.STRING,
    Types.STRING, 
    Types.INT,
    Types.STRING
};

// Configure sink with schema
CassandraAppendTableSink schemaConfiguredSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
).configure(fieldNames, fieldTypes);

tableEnv.registerTableSink("typed_cassandra_users", schemaConfiguredSink);

Complex Data Processing

Leverage Table API for complex transformations before writing to Cassandra:

import org.apache.flink.table.api.Table;

// Create source table with complex schema
tableEnv.executeSql(
    "CREATE TABLE event_stream (" +
    "  event_id STRING," +
    "  user_id STRING," +
    "  event_type STRING," +
    "  event_time TIMESTAMP(3)," +
    "  properties MAP<STRING, STRING>," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// Process data with windowing and aggregation
Table processedData = tableEnv.sqlQuery(
    "SELECT " +
    "  user_id," +
    "  event_type," +
    "  COUNT(*) as event_count," +
    "  MAX(event_time) as last_event_time," +
    "  TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start " +
    "FROM event_stream " +
    "GROUP BY " +
    "  user_id, " +
    "  event_type, " +
    "  TUMBLE(event_time, INTERVAL '1' HOUR)"
);

// Register Cassandra sink for aggregated data
CassandraAppendTableSink aggregationSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.user_hourly_stats (user_id, event_type, event_count, last_event_time, window_start) VALUES (?, ?, ?, ?, ?)"
);

String[] aggFieldNames = {"user_id", "event_type", "event_count", "last_event_time", "window_start"};
TypeInformation<?>[] aggFieldTypes = {
    Types.STRING,
    Types.STRING,
    Types.LONG,
    Types.SQL_TIMESTAMP,
    Types.SQL_TIMESTAMP
};

tableEnv.registerTableSink("user_stats", aggregationSink.configure(aggFieldNames, aggFieldTypes));

// Insert processed data
processedData.executeInsert("user_stats");

Catalog Integration

Register Cassandra tables in Flink's catalog system:

import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.api.Schema;

// Create catalog
GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("cassandra_catalog");
tableEnv.registerCatalog("cassandra_catalog", catalog);
tableEnv.useCatalog("cassandra_catalog");

// Define table schema
Schema schema = Schema.newBuilder()
    .column("id", DataTypes.STRING())
    .column("name", DataTypes.STRING())
    .column("age", DataTypes.INT())
    .column("created_at", DataTypes.TIMESTAMP(3))
    .build();

// Create table descriptor
Map<String, String> properties = new HashMap<>();
properties.put("connector", "cassandra");
properties.put("cassandra.hosts", "127.0.0.1");
properties.put("cassandra.keyspace", "example");
properties.put("cassandra.table", "users");

CatalogTable catalogTable = CatalogTableImpl.of(
    schema,
    "Cassandra users table",
    new ArrayList<>(),
    properties
);

// Register table in catalog
catalog.createTable(
    new ObjectPath("default", "users"),
    catalogTable,
    false
);

Advanced Table Patterns

Dynamic Tables

Handle changing schemas with dynamic table concepts:

// Source with evolving schema
tableEnv.executeSql(
    "CREATE TABLE dynamic_events (" +
    "  event_id STRING," +
    "  event_data ROW<" +
    "    user_id STRING," +
    "    action STRING," +
    "    metadata MAP<STRING, STRING>" +
    "  >," +
    "  event_time TIMESTAMP(3)" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'dynamic_events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// Flatten and transform
Table flattenedEvents = tableEnv.sqlQuery(
    "SELECT " +
    "  event_id," +
    "  event_data.user_id," +
    "  event_data.action," +
    "  event_data.metadata['source'] as source," +
    "  event_time " +
    "FROM dynamic_events"
);

// Register sink for flattened data
CassandraAppendTableSink dynamicSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.flattened_events (event_id, user_id, action, source, event_time) VALUES (?, ?, ?, ?, ?)"
);

tableEnv.registerTableSink("flattened_events", dynamicSink);
flattenedEvents.executeInsert("flattened_events");

Temporal Tables

Work with time-based data processing:

// Create temporal table with event time
tableEnv.executeSql(
    "CREATE TABLE temporal_data (" +
    "  key_id STRING," +
    "  value_data STRING," +
    "  process_time AS PROCTIME()," +
    "  event_time TIMESTAMP(3)," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'temporal_stream'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// Temporal join example (requires reference table)
Table temporalResult = tableEnv.sqlQuery(
    "SELECT " +
    "  t1.key_id," +
    "  t1.value_data," +
    "  t1.event_time " +
    "FROM temporal_data t1 " +
    "WHERE t1.event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR"
);

// Sink recent data to Cassandra
CassandraAppendTableSink temporalSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.recent_data (key_id, value_data, event_time) VALUES (?, ?, ?)"
);

tableEnv.registerTableSink("recent_data", temporalSink);
temporalResult.executeInsert("recent_data");

Error Handling and Monitoring

Sink Error Handling

Handle errors at the table level:

// Custom sink with error handling
public class ErrorHandlingCassandraAppendTableSink extends CassandraAppendTableSink {
    private final CassandraFailureHandler failureHandler;
    
    public ErrorHandlingCassandraAppendTableSink(
            ClusterBuilder builder, 
            String cql, 
            CassandraFailureHandler failureHandler) {
        super(builder, cql);
        this.failureHandler = failureHandler;
    }
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        // Create sink with custom failure handler
        CassandraRowSink sink = new CassandraRowSink(
            dataStream.getType().getArity(),
            getCql(),
            getBuilder(),
            CassandraSinkBaseConfig.newBuilder().build(),
            failureHandler
        );
        
        return dataStream.addSink(sink)
            .setParallelism(dataStream.getParallelism())
            .name("Cassandra Table Sink");
    }
}

Performance Monitoring

Monitor table sink performance:

// Enable metrics collection
env.getConfig().setLatencyTrackingInterval(1000);

// Create sink with monitoring
CassandraAppendTableSink monitoredSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.monitored_data (id, value, timestamp) VALUES (?, ?, ?)"
) {
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return super.consumeDataStream(dataStream)
            .name("Monitored Cassandra Sink")
            .setParallelism(4); // Explicit parallelism for monitoring
    }
};

tableEnv.registerTableSink("monitored_sink", monitoredSink);

Limitations and Considerations

Append-Only Semantics

The Cassandra table sink only supports append operations:

// Supported: INSERT operations
table.executeInsert("cassandra_sink"); // ✓

// NOT supported: UPDATE/DELETE operations
// Table API UPDATE/DELETE queries will fail with CassandraAppendTableSink

Schema Evolution

Handle schema changes carefully:

// Define flexible schema for evolution
String[] flexibleFieldNames = {"id", "data", "metadata", "timestamp"};
TypeInformation<?>[] flexibleFieldTypes = {
    Types.STRING,
    Types.STRING,  // JSON string for flexible data
    Types.MAP(Types.STRING, Types.STRING),  // Key-value metadata
    Types.SQL_TIMESTAMP
};

CassandraAppendTableSink flexibleSink = new CassandraAppendTableSink(
    builder,
    "INSERT INTO example.flexible_data (id, data, metadata, timestamp) VALUES (?, ?, ?, ?)"
).configure(flexibleFieldNames, flexibleFieldTypes);

Performance Considerations

Table API adds processing overhead:

// For high-throughput scenarios, consider direct DataStream API
// Table API is better for complex transformations and SQL compatibility

// Direct DataStream (higher performance)
stream.addSink(cassandraDirectSink);

// Table API (more functionality, slightly lower performance)
Table table = tableEnv.fromDataStream(stream);
table.executeInsert("cassandra_table_sink");

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra

docs

batch-connectors.md

configuration.md

index.md

streaming-sinks.md

table-api.md

write-ahead-logging.md

tile.json