CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-cassandra

Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases

Pending
Overview
Eval results
Files

write-ahead-logging.mddocs/

Write-Ahead Logging

Exactly-once processing guarantees through write-ahead logging with checkpoint integration. Stores records in Flink's state backend and commits them to Cassandra only on successful checkpoint completion, ensuring data consistency even in the presence of failures.

Capabilities

Write-Ahead Log Concept

Write-ahead logging provides exactly-once processing semantics by:

  1. Buffering: Incoming records are stored in Flink's state backend instead of being written directly to Cassandra
  2. Checkpointing: Records are held until a Flink checkpoint successfully completes
  3. Batch Commit: On checkpoint completion, buffered records are written to Cassandra in batches
  4. Failure Recovery: If a failure occurs before checkpoint completion, buffered records are discarded and reprocessed

This ensures that each record is written to Cassandra exactly once, even if the job fails and restarts.

Tuple Write-Ahead Sink

Write-ahead log implementation for Flink Tuple types.

public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
    public CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer);
    public void open();
    public void close();
    protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp);
}

Usage Example:

import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;

// Enable checkpointing in the environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds

// Create cluster builder
ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder.addContactPoint("127.0.0.1").build();
    }
};

// Create write-ahead sink using the builder pattern
DataStream<Tuple3<String, Integer, Long>> stream = // ... your data stream

CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.events (id, count, timestamp) VALUES (?, ?, ?);")
    .setClusterBuilder(builder)
    .enableWriteAheadLog()  // This creates the write-ahead sink internally
    .build();

Manual Construction:

// Manual construction (not typically needed)
TypeSerializer<Tuple3<String, Integer, Long>> serializer = 
    stream.getType().createSerializer(env.getConfig());

CassandraCommitter committer = new CassandraCommitter(builder);

CassandraTupleWriteAheadSink<Tuple3<String, Integer, Long>> walSink = 
    new CassandraTupleWriteAheadSink<>(
        "INSERT INTO example.events (id, count, timestamp) VALUES (?, ?, ?);",
        serializer,
        builder,
        committer
    );

stream.transform("Cassandra WAL Sink", null, walSink);

Row Write-Ahead Sink

Write-ahead log implementation for Flink Row types.

public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> {
    public CassandraRowWriteAheadSink(String insertQuery, TypeSerializer<Row> serializer, ClusterBuilder builder, CheckpointCommitter committer);
    public void open();
    public void close();
    protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp);
}

Usage Example:

import org.apache.flink.streaming.connectors.cassandra.CassandraRowWriteAheadSink;
import org.apache.flink.types.Row;

// Enable checkpointing
env.enableCheckpointing(10000); // Checkpoint every 10 seconds

DataStream<Row> rowStream = // ... your row stream

CassandraSink.addSink(rowStream)
    .setQuery("INSERT INTO example.metrics (id, value, timestamp) VALUES (?, ?, ?);")
    .setClusterBuilder(builder)
    .enableWriteAheadLog()
    .build();

Checkpoint Committer

Manages checkpoint information storage and retrieval for exactly-once processing.

public class CassandraCommitter extends CheckpointCommitter {
    public CassandraCommitter(ClusterBuilder builder);
    public CassandraCommitter(ClusterBuilder builder, String keySpace);
    public void setJobId(String id);
    public void createResource();
    public void open();
    public void close();
    public void commitCheckpoint(int subtaskIdx, long checkpointId);
    public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
}

Default Behavior:

// Default committer (uses default keyspace and table names)
CassandraCommitter defaultCommitter = new CassandraCommitter(builder);

Custom Configuration:

// Custom keyspace for checkpoint metadata
CassandraCommitter customCommitter = new CassandraCommitter(builder, "checkpoint_ks");
customCommitter.setJobId("my-flink-job-v1");

// The committer will create the following schema:
// KEYSPACE checkpoint_ks
// TABLE checkpoint_ks.checkpoints_my_flink_job_v1 (
//     sink_id int,
//     checkpoint_id bigint,
//     PRIMARY KEY (sink_id, checkpoint_id)
// )

Manual Setup:

// Initialize the checkpoint storage manually
CassandraCommitter committer = new CassandraCommitter(builder, "checkpoints");
committer.setJobId("data-pipeline");
committer.createResource(); // Creates keyspace and table
committer.open();

// Use with write-ahead sink
CassandraTupleWriteAheadSink<Tuple2<String, Integer>> walSink = 
    new CassandraTupleWriteAheadSink<>(
        "INSERT INTO example.data (key, value) VALUES (?, ?);",
        serializer,
        builder,
        committer
    );

Advanced Usage Patterns

Checkpoint Configuration

Proper checkpoint configuration is critical for write-ahead logging:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing with appropriate interval
env.enableCheckpointing(5000); // 5 second intervals

// Configure checkpoint behavior
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(1000); // 1 second pause between checkpoints
config.setMaxConcurrentCheckpoints(1); // Only one concurrent checkpoint
config.setCheckpointTimeout(300000); // 5 minute timeout
config.setFailOnCheckpointingErrors(true); // Fail job on checkpoint errors

// Enable external checkpoints for recovery
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Performance Considerations

Write-ahead logging trades throughput for consistency:

// High-throughput configuration
env.enableCheckpointing(30000); // Longer checkpoint intervals
config.setMaxConcurrentCheckpoints(1);
config.setMinPauseBetweenCheckpoints(5000);

// Low-latency configuration  
env.enableCheckpointing(1000); // Frequent checkpoints
config.setMaxConcurrentCheckpoints(1);
config.setMinPauseBetweenCheckpoints(500);

// Balanced configuration
env.enableCheckpointing(10000); // 10 second intervals
config.setMaxConcurrentCheckpoints(1);
config.setMinPauseBetweenCheckpoints(2000);

Error Handling and Recovery

Handle failures gracefully with write-ahead logging:

// Configure restart strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3, // number of restart attempts
    org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delay
));

// Custom failure handling
CassandraFailureHandler walFailureHandler = new CassandraFailureHandler() {
    @Override
    public void onFailure(Throwable failure) throws IOException {
        // For WAL sinks, be more conservative with error handling
        // since failures affect exactly-once guarantees
        logger.error("WAL sink failure - will cause job restart", failure);
        throw new IOException("WAL operation failed", failure);
    }
};

CassandraSink.addSink(stream)
    .setQuery("INSERT INTO example.critical_data (id, value) VALUES (?, ?);")
    .setClusterBuilder(builder)
    .setFailureHandler(walFailureHandler)
    .enableWriteAheadLog()
    .build();

Monitoring and Metrics

Monitor write-ahead log performance:

// Add custom metrics to track WAL performance
public class MonitoredCassandraCommitter extends CassandraCommitter {
    private Counter checkpointCommits;
    private Counter checkpointFailures;
    
    public MonitoredCassandraCommitter(ClusterBuilder builder) {
        super(builder);
    }
    
    @Override
    public void open() throws Exception {
        super.open();
        
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        checkpointCommits = metricGroup.counter("checkpoint_commits");
        checkpointFailures = metricGroup.counter("checkpoint_failures");
    }
    
    @Override
    public void commitCheckpoint(int subtaskIdx, long checkpointId) throws Exception {
        try {
            super.commitCheckpoint(subtaskIdx, checkpointId);
            checkpointCommits.inc();
        } catch (Exception e) {
            checkpointFailures.inc();
            throw e;
        }
    }
}

State Backend Configuration

Choose appropriate state backend for WAL:

// RocksDB state backend for large state (recommended for WAL)
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/path/to/checkpoints"));

// Memory state backend for small state (development only)
env.setStateBackend(new MemoryStateBackend());

// FileSystem state backend for medium state
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/path/to/checkpoints"));

Limitations and Considerations

POJO Limitations

Write-ahead logging is only supported for Tuple and Row types:

// Supported
DataStream<Tuple2<String, Integer>> tuples = // ...
CassandraSink.addSink(tuples)
    .setQuery("INSERT INTO example.data (key, value) VALUES (?, ?);")
    .setClusterBuilder(builder)
    .enableWriteAheadLog()  // ✓ Supported
    .build();

DataStream<Row> rows = // ...
CassandraSink.addSink(rows)
    .setQuery("INSERT INTO example.data (key, value) VALUES (?, ?);")
    .setClusterBuilder(builder)
    .enableWriteAheadLog()  // ✓ Supported
    .build();

// NOT supported
DataStream<MyPojo> pojos = // ...
CassandraSink.addSink(pojos)
    .setDefaultKeyspace("example")
    .setClusterBuilder(builder)
    .enableWriteAheadLog()  // ✗ Will throw IllegalArgumentException
    .build();

Performance Impact

Write-ahead logging introduces additional overhead:

  • Latency: Records are buffered until checkpoint completion
  • Memory: Records are stored in Flink's state backend
  • Throughput: Batch writes occur only at checkpoint intervals
  • Storage: Checkpoint metadata is stored in Cassandra

Cassandra Requirements

Write-ahead logging creates additional tables in Cassandra:

-- Default checkpoint metadata table
CREATE KEYSPACE IF NOT EXISTS checkpoints_sink 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

CREATE TABLE IF NOT EXISTS checkpoints_sink.checkpoints_<job_id> (
    sink_id int,
    checkpoint_id bigint,
    PRIMARY KEY (sink_id, checkpoint_id)
);

Ensure your Cassandra cluster has sufficient resources for both data and checkpoint metadata.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra

docs

batch-connectors.md

configuration.md

index.md

streaming-sinks.md

table-api.md

write-ahead-logging.md

tile.json