CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities

Pending
Overview
Eval results
Files

fault-tolerance.mddocs/

Fault Tolerance & Write-Ahead Logging

The Flink Cassandra Connector provides exactly-once processing guarantees through checkpoint coordination and write-ahead logging for streaming applications. This ensures data consistency and prevents data loss even in the presence of failures.

Capabilities

CassandraCommitter

CheckpointCommitter implementation that stores checkpoint information in a dedicated Cassandra table for coordination between Flink checkpoints and Cassandra writes.

/**
 * CheckpointCommitter that saves completed checkpoint information in Cassandra
 * Creates entries in format: |operator_id | subtask_id | last_completed_checkpoint|
 */
public class CassandraCommitter extends CheckpointCommitter {
    /**
     * Creates committer with default keyspace 'flink_auxiliary'
     * @param builder ClusterBuilder for Cassandra connection configuration
     */
    public CassandraCommitter(ClusterBuilder builder);
    
    /**
     * Creates committer with custom keyspace for checkpoint storage
     * @param builder ClusterBuilder for Cassandra connection configuration  
     * @param keySpace custom keyspace name for checkpoint table storage
     */
    public CassandraCommitter(ClusterBuilder builder, String keySpace);
    
    /**
     * Sets job ID for checkpoint table naming (called internally by Flink)
     * @param id unique job identifier
     * @throws Exception if setup fails
     */
    public void setJobId(String id) throws Exception;
    
    /**
     * Creates necessary keyspace and checkpoint table if they don't exist
     * @throws Exception if table/keyspace creation fails
     */
    @Override
    public void createResource() throws Exception;
    
    /**
     * Opens connection for checkpoint operations
     * @throws Exception if connection setup fails
     */
    @Override
    public void open() throws Exception;
    
    /**
     * Closes connections and clears checkpoint cache
     * @throws Exception if cleanup fails
     */
    @Override
    public void close() throws Exception;
    
    /**
     * Records checkpoint completion in Cassandra
     * @param subtaskIdx subtask index that completed checkpoint
     * @param checkpointId checkpoint ID that was completed
     */
    @Override
    public void commitCheckpoint(int subtaskIdx, long checkpointId);
    
    /**
     * Checks if a specific checkpoint has been committed
     * Uses local cache to minimize Cassandra queries
     * @param subtaskIdx subtask index to check
     * @param checkpointId checkpoint ID to verify
     * @return true if checkpoint has been committed
     */
    @Override
    public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
}

Write-Ahead Logging Configuration

Enable exactly-once processing by configuring write-ahead logging in sink builders.

Basic WAL Configuration:

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;

// Enable WAL with default committer
CassandraSink<Tuple3<String, Integer, String>> walSink = CassandraSink
    .addSink(stream)
    .setQuery("INSERT INTO users (id, age, name) VALUES (?, ?, ?)")
    .setHost("localhost")
    .enableWriteAheadLog() // Uses CassandraCommitter with default keyspace
    .build();

Custom WAL Configuration:

// Create custom committer with specific keyspace
ClusterBuilder committerClusterBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoint("cassandra-checkpoint.example.com")
            .withPort(9042)
            .withCredentials("checkpoint_user", "checkpoint_password")
            .build();
    }
};

CassandraCommitter customCommitter = new CassandraCommitter(
    committerClusterBuilder, 
    "custom_checkpoint_keyspace"
);

// Enable WAL with custom committer
CassandraSink<Tuple4<String, Long, Double, Boolean>> customWalSink = CassandraSink
    .addSink(complexStream)
    .setQuery("INSERT INTO transactions (tx_id, timestamp, amount, processed) VALUES (?, ?, ?, ?)")
    .setClusterBuilder(dataClusterBuilder)
    .enableWriteAheadLog(customCommitter)
    .build();

CassandraTupleWriteAheadSink Implementation

The underlying write-ahead logging sink implementation that provides exactly-once guarantees.

/**
 * Write-ahead logging sink that stores incoming records in Flink's state backend
 * and commits them to Cassandra only when checkpoint completes successfully
 */
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
    /**
     * Creates WAL sink with checkpoint coordination
     * @param insertQuery CQL INSERT statement with parameter placeholders
     * @param serializer TypeSerializer for storing records in state backend
     * @param builder ClusterBuilder for Cassandra connection
     * @param committer CheckpointCommitter for tracking checkpoint completion
     * @throws Exception if initialization fails
     */
    protected CassandraTupleWriteAheadSink(
        String insertQuery, 
        TypeSerializer<IN> serializer, 
        ClusterBuilder builder, 
        CheckpointCommitter committer
    ) throws Exception;
    
    /**
     * Opens connections and validates checkpointing is enabled
     * @throws Exception if checkpointing is disabled or connection fails
     */
    public void open() throws Exception;
    
    /**
     * Closes connections and cleans up resources
     * @throws Exception if cleanup fails
     */
    @Override
    public void close() throws Exception;
    
    /**
     * Sends batch of values to Cassandra with checkpoint coordination
     * @param values batch of tuples to write
     * @param checkpointId checkpoint ID for coordination
     * @param timestamp checkpoint timestamp
     * @return true if all writes successful, false if any failed
     * @throws Exception if batch processing fails
     */
    @Override
    protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;
}

Fault Tolerance Patterns

Exactly-Once Processing Example

Complete example showing exactly-once processing setup with proper error handling.

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.java.tuple.Tuple3;

// Configure execution environment for exactly-once processing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing with exactly-once mode
env.enableCheckpointing(10000); // checkpoint every 10 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// Configure state backend for durability
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints"));

// Data source
DataStream<Tuple3<String, Long, String>> criticalEvents = env
    .addSource(new CriticalEventSource())
    .name("Critical Events Source");

// Create WAL-enabled sink for exactly-once guarantees
ClusterBuilder sinkClusterBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoint("cassandra-primary.example.com")
            .addContactPoint("cassandra-secondary.example.com")
            .withPort(9042)
            .withCredentials("sink_user", "sink_password")
            .withRetryPolicy(new ExponentialReconnectionPolicy(1000, 10000))
            .build();
    }
};

ClusterBuilder checkpointClusterBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoint("cassandra-checkpoint.example.com")
            .withPort(9042) 
            .withCredentials("checkpoint_user", "checkpoint_password")
            .build();
    }
};

CassandraCommitter committer = new CassandraCommitter(
    checkpointClusterBuilder, 
    "flink_checkpoints"
);

CassandraSink<Tuple3<String, Long, String>> exactlyOnceSink = CassandraSink
    .addSink(criticalEvents)
    .setQuery("INSERT INTO critical_events (event_id, timestamp, data) VALUES (?, ?, ?)")
    .setClusterBuilder(sinkClusterBuilder)
    .enableWriteAheadLog(committer)
    .build();

exactlyOnceSink
    .name("Critical Events Cassandra Sink")
    .uid("critical-events-sink") // Important for savepoint compatibility
    .setParallelism(4);

env.execute("Critical Events Processing");

Recovery and Restart Behavior

Understanding how the connector behaves during failures and recovery.

// Recovery behavior configuration
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3, // number of restart attempts
    Time.of(10, TimeUnit.SECONDS) // delay between restarts
));

// On failure and restart:
// 1. Flink restores from last successful checkpoint
// 2. CassandraCommitter checks which records were already committed
// 3. WAL sink replays only uncommitted records
// 4. Processing continues from the checkpoint point

// Custom restart strategy for production
env.setRestartStrategy(RestartStrategies.failureRateRestart(
    5, // max failures per interval
    Time.of(5, TimeUnit.MINUTES), // time interval
    Time.of(30, TimeUnit.SECONDS) // delay between restarts
));

Monitoring Fault Tolerance

Implement monitoring for checkpoint and recovery metrics.

// Custom metrics for monitoring WAL sink performance
public class MonitoredCassandraCommitter extends CassandraCommitter {
    private final Counter checkpointCommits;
    private final Histogram checkpointLatency;
    
    public MonitoredCassandraCommitter(ClusterBuilder builder, MetricGroup metricGroup) {
        super(builder);
        this.checkpointCommits = metricGroup.counter("checkpoint_commits");
        this.checkpointLatency = metricGroup.histogram("checkpoint_latency");
    }
    
    @Override
    public void commitCheckpoint(int subtaskIdx, long checkpointId) {
        long startTime = System.currentTimeMillis();
        super.commitCheckpoint(subtaskIdx, checkpointId);
        long latency = System.currentTimeMillis() - startTime;
        
        checkpointCommits.inc();
        checkpointLatency.update(latency);
    }
}

Checkpoint Storage Configuration

Default Checkpoint Table Schema

The CassandraCommitter creates the following table structure:

-- Default keyspace: flink_auxiliary
-- Default table name: checkpoints_{job_id}
CREATE TABLE IF NOT EXISTS flink_auxiliary.checkpoints_job123 (
    sink_id text,
    sub_id int,
    checkpoint_id bigint,
    PRIMARY KEY (sink_id, sub_id)
);

Custom Checkpoint Storage

Configure custom keyspace and table settings for checkpoint storage.

// Custom keyspace configuration
CassandraCommitter committer = new CassandraCommitter(builder, "production_checkpoints");

// The committer will create:
// - Keyspace: production_checkpoints  
// - Table: checkpoints_{job_id}
// - Replication: SimpleStrategy with replication_factor=1 (default)

// For production, consider creating the keyspace manually with appropriate replication:
/*
CREATE KEYSPACE production_checkpoints 
WITH replication = {
    'class': 'NetworkTopologyStrategy',
    'datacenter1': 3,
    'datacenter2': 2
};
*/

Best Practices

Checkpointing Configuration

  • Checkpoint interval: 10-60 seconds depending on throughput and latency requirements
  • Exactly-once mode: Always use CheckpointingMode.EXACTLY_ONCE for critical data
  • Concurrent checkpoints: Set to 1 to avoid resource contention
  • Checkpoint timeout: Set based on cluster size and network latency

Resource Management

  • Separate clusters: Consider using separate Cassandra clusters for data and checkpoints
  • Keyspace isolation: Use dedicated keyspace for checkpoint tables
  • Connection pooling: Configure appropriate connection pools for both data and checkpoint connections

Error Handling

  • Restart strategies: Configure appropriate restart strategies for your use case
  • Alerting: Monitor checkpoint failure rates and set up alerts
  • Manual intervention: Plan procedures for manual checkpoint recovery if needed

Performance Optimization

  • Batch processing: WAL sink processes records in batches during checkpoints
  • Parallelism: Configure sink parallelism based on Cassandra cluster capacity
  • Network optimization: Co-locate Flink and Cassandra for reduced latency

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

docs

batch-processing.md

connection-configuration.md

fault-tolerance.md

index.md

streaming-sinks.md

tile.json