CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-core

Apache Flink streaming core library providing fundamental building blocks for scalable stream data processing.

Pending
Overview
Eval results
Files

checkpointing-state.mddocs/

Checkpointing and State

Flink's checkpointing mechanism provides fault tolerance by creating consistent snapshots of streaming application state. This enables exactly-once processing semantics and recovery from failures.

Core Checkpointing Interfaces

Checkpointed<T>

Interface for functions that need to participate in checkpointing.

public interface Checkpointed<T> extends Serializable {
    T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
    void restoreState(T state) throws Exception;
}

Usage Example:

import org.apache.flink.streaming.api.checkpoint.Checkpointed;

public class StatefulMapFunction implements MapFunction<String, String>, Checkpointed<Integer> {
    private int counter = 0;
    
    @Override
    public String map(String value) throws Exception {
        counter++;
        return value + "_" + counter;
    }
    
    @Override
    public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        return counter;
    }
    
    @Override
    public void restoreState(Integer state) throws Exception {
        counter = state;
    }
}

CheckpointedAsynchronously<T>

Interface for functions that can perform asynchronous checkpointing to avoid blocking stream processing.

public interface CheckpointedAsynchronously<T> extends Checkpointed<T> {
    // Inherits snapshotState and restoreState methods
    // Indicates that snapshotState can be called asynchronously
}

Usage Example:

import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;

public class AsyncCheckpointedSink implements SinkFunction<String>, CheckpointedAsynchronously<Map<String, Integer>> {
    private Map<String, Integer> state = new HashMap<>();
    
    @Override
    public void invoke(String value) throws Exception {
        state.put(value, state.getOrDefault(value, 0) + 1);
    }
    
    @Override
    public Map<String, Integer> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        // Can be called asynchronously - return a copy of the state
        return new HashMap<>(state);
    }
    
    @Override
    public void restoreState(Map<String, Integer> restoredState) throws Exception {
        state = restoredState;
    }
}

Checkpoint Committing

CheckpointCommitter

Interface for committing checkpoints to external systems for additional durability guarantees.

public interface CheckpointCommitter extends Serializable {
    void commitCheckpoint(long checkpointId) throws Exception;
    boolean isCheckpointCommitted(long checkpointId) throws Exception;
}

Usage Example:

import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;

public class DatabaseCheckpointCommitter implements CheckpointCommitter {
    private final String connectionUrl;
    
    public DatabaseCheckpointCommitter(String connectionUrl) {
        this.connectionUrl = connectionUrl;
    }
    
    @Override
    public void commitCheckpoint(long checkpointId) throws Exception {
        // Commit checkpoint information to external database
        // This ensures the checkpoint is durably stored
        executeUpdate("INSERT INTO checkpoints (id, timestamp) VALUES (?, ?)", 
                     checkpointId, System.currentTimeMillis());
    }
    
    @Override
    public boolean isCheckpointCommitted(long checkpointId) throws Exception {
        // Check if checkpoint was successfully committed
        return checkExists("SELECT 1 FROM checkpoints WHERE id = ?", checkpointId);
    }
    
    private void executeUpdate(String sql, Object... params) throws Exception {
        // Database update implementation
    }
    
    private boolean checkExists(String sql, Object... params) throws Exception {
        // Database query implementation
        return false;
    }
}

State Handle Providers

StateHandleProvider<T>

Interface for providing state handle storage and retrieval mechanisms.

public interface StateHandleProvider<T extends StateHandle> extends Serializable {
    T createStateHandle(String state) throws Exception;
    String getStateFromHandle(T stateHandle) throws Exception;
}

Checkpoint Configuration

Checkpointing is configured at the execution environment level:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Enable checkpointing with 5-second interval
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds

// Configure checkpointing behavior
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setCheckpointTimeout(60000);

Integration with User Functions

Rich Functions with State

Rich functions can access operator state through the RuntimeContext:

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;

public class StatefulRichMapFunction extends RichMapFunction<String, String> {
    private ValueState<Integer> counterState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
            "counter",     // state name
            Integer.class, // type class
            0              // default value
        );
        counterState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public String map(String value) throws Exception {
        Integer currentCount = counterState.value();
        currentCount++;
        counterState.update(currentCount);
        return value + "_" + currentCount;
    }
}

Keyed State

For keyed streams, state is automatically partitioned by key:

import org.apache.flink.streaming.api.datastream.DataStream;

DataStream<String> input = env.socketTextStream("localhost", 9999);

DataStream<String> result = input
    .groupBy(value -> value.split(" ")[0])  // Group by first word
    .map(new StatefulRichMapFunction());   // State is keyed automatically

Fault Tolerance Guarantees

Exactly-Once Processing

Flink provides exactly-once processing guarantees through:

  1. Consistent Checkpointing: All operators checkpoint their state consistently
  2. Checkpoint Barriers: Special watermark-like records that align checkpoints across parallel streams
  3. State Recovery: On failure, all operators restore state from the last successful checkpoint
// Example of exactly-once sink with checkpointing
public class ExactlyOnceSink implements SinkFunction<String>, CheckpointedAsynchronously<List<String>> {
    private List<String> pendingRecords = new ArrayList<>();
    
    @Override
    public void invoke(String value) throws Exception {
        // Buffer records until checkpoint
        pendingRecords.add(value);
    }
    
    @Override
    public List<String> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        // Flush all pending records to external system
        flushToExternalSystem(pendingRecords);
        
        // Return state for recovery
        List<String> snapshot = new ArrayList<>(pendingRecords);
        pendingRecords.clear();
        return snapshot;
    }
    
    @Override
    public void restoreState(List<String> state) throws Exception {
        pendingRecords = state;
    }
    
    private void flushToExternalSystem(List<String> records) throws Exception {
        // Implementation to write records to external system
    }
}

Types

// State handle interface
public interface StateHandle extends Serializable {
    void discardState() throws Exception;
    long getStateSize() throws Exception;
}

// Checkpoint configuration
public class CheckpointConfig {
    public void setCheckpointingMode(CheckpointingMode mode);
    public void setMinPauseBetweenCheckpoints(long minPause);
    public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints);
    public void setCheckpointTimeout(long checkpointTimeout);
    public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup);
}

// Checkpointing modes
public enum CheckpointingMode {
    EXACTLY_ONCE,  // Exactly-once processing semantics
    AT_LEAST_ONCE  // At-least-once processing semantics
}

// External checkpoint cleanup modes  
public enum ExternalizedCheckpointCleanup {
    RETAIN_ON_CANCELLATION,     // Keep checkpoints when job is cancelled
    DELETE_ON_CANCELLATION      // Delete checkpoints when job is cancelled
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-core

docs

checkpointing-state.md

datastream-operations.md

execution-environment.md

index.md

sources-and-sinks.md

stream-operators.md

windowing.md

tile.json