Apache Flink streaming core library providing fundamental building blocks for scalable stream data processing.
—
Flink's checkpointing mechanism provides fault tolerance by creating consistent snapshots of streaming application state. This enables exactly-once processing semantics and recovery from failures.
Interface for functions that need to participate in checkpointing.
public interface Checkpointed<T> extends Serializable {
T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
void restoreState(T state) throws Exception;
}Usage Example:
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
public class StatefulMapFunction implements MapFunction<String, String>, Checkpointed<Integer> {
private int counter = 0;
@Override
public String map(String value) throws Exception {
counter++;
return value + "_" + counter;
}
@Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return counter;
}
@Override
public void restoreState(Integer state) throws Exception {
counter = state;
}
}Interface for functions that can perform asynchronous checkpointing to avoid blocking stream processing.
public interface CheckpointedAsynchronously<T> extends Checkpointed<T> {
// Inherits snapshotState and restoreState methods
// Indicates that snapshotState can be called asynchronously
}Usage Example:
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
public class AsyncCheckpointedSink implements SinkFunction<String>, CheckpointedAsynchronously<Map<String, Integer>> {
private Map<String, Integer> state = new HashMap<>();
@Override
public void invoke(String value) throws Exception {
state.put(value, state.getOrDefault(value, 0) + 1);
}
@Override
public Map<String, Integer> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
// Can be called asynchronously - return a copy of the state
return new HashMap<>(state);
}
@Override
public void restoreState(Map<String, Integer> restoredState) throws Exception {
state = restoredState;
}
}Interface for committing checkpoints to external systems for additional durability guarantees.
public interface CheckpointCommitter extends Serializable {
void commitCheckpoint(long checkpointId) throws Exception;
boolean isCheckpointCommitted(long checkpointId) throws Exception;
}Usage Example:
import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
public class DatabaseCheckpointCommitter implements CheckpointCommitter {
private final String connectionUrl;
public DatabaseCheckpointCommitter(String connectionUrl) {
this.connectionUrl = connectionUrl;
}
@Override
public void commitCheckpoint(long checkpointId) throws Exception {
// Commit checkpoint information to external database
// This ensures the checkpoint is durably stored
executeUpdate("INSERT INTO checkpoints (id, timestamp) VALUES (?, ?)",
checkpointId, System.currentTimeMillis());
}
@Override
public boolean isCheckpointCommitted(long checkpointId) throws Exception {
// Check if checkpoint was successfully committed
return checkExists("SELECT 1 FROM checkpoints WHERE id = ?", checkpointId);
}
private void executeUpdate(String sql, Object... params) throws Exception {
// Database update implementation
}
private boolean checkExists(String sql, Object... params) throws Exception {
// Database query implementation
return false;
}
}Interface for providing state handle storage and retrieval mechanisms.
public interface StateHandleProvider<T extends StateHandle> extends Serializable {
T createStateHandle(String state) throws Exception;
String getStateFromHandle(T stateHandle) throws Exception;
}Checkpointing is configured at the execution environment level:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Enable checkpointing with 5-second interval
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
// Configure checkpointing behavior
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setCheckpointTimeout(60000);Rich functions can access operator state through the RuntimeContext:
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
public class StatefulRichMapFunction extends RichMapFunction<String, String> {
private ValueState<Integer> counterState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
"counter", // state name
Integer.class, // type class
0 // default value
);
counterState = getRuntimeContext().getState(descriptor);
}
@Override
public String map(String value) throws Exception {
Integer currentCount = counterState.value();
currentCount++;
counterState.update(currentCount);
return value + "_" + currentCount;
}
}For keyed streams, state is automatically partitioned by key:
import org.apache.flink.streaming.api.datastream.DataStream;
DataStream<String> input = env.socketTextStream("localhost", 9999);
DataStream<String> result = input
.groupBy(value -> value.split(" ")[0]) // Group by first word
.map(new StatefulRichMapFunction()); // State is keyed automaticallyFlink provides exactly-once processing guarantees through:
// Example of exactly-once sink with checkpointing
public class ExactlyOnceSink implements SinkFunction<String>, CheckpointedAsynchronously<List<String>> {
private List<String> pendingRecords = new ArrayList<>();
@Override
public void invoke(String value) throws Exception {
// Buffer records until checkpoint
pendingRecords.add(value);
}
@Override
public List<String> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
// Flush all pending records to external system
flushToExternalSystem(pendingRecords);
// Return state for recovery
List<String> snapshot = new ArrayList<>(pendingRecords);
pendingRecords.clear();
return snapshot;
}
@Override
public void restoreState(List<String> state) throws Exception {
pendingRecords = state;
}
private void flushToExternalSystem(List<String> records) throws Exception {
// Implementation to write records to external system
}
}// State handle interface
public interface StateHandle extends Serializable {
void discardState() throws Exception;
long getStateSize() throws Exception;
}
// Checkpoint configuration
public class CheckpointConfig {
public void setCheckpointingMode(CheckpointingMode mode);
public void setMinPauseBetweenCheckpoints(long minPause);
public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints);
public void setCheckpointTimeout(long checkpointTimeout);
public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup);
}
// Checkpointing modes
public enum CheckpointingMode {
EXACTLY_ONCE, // Exactly-once processing semantics
AT_LEAST_ONCE // At-least-once processing semantics
}
// External checkpoint cleanup modes
public enum ExternalizedCheckpointCleanup {
RETAIN_ON_CANCELLATION, // Keep checkpoints when job is cancelled
DELETE_ON_CANCELLATION // Delete checkpoints when job is cancelled
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-core