Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
—
Apache Flink provides fault tolerance through checkpointing, which creates consistent snapshots of application state. This enables exactly-once processing guarantees and recovery from failures.
Configure checkpointing behavior and fault tolerance settings.
/**
* Enable checkpointing with specified interval
* @param interval - checkpoint interval in milliseconds
*/
StreamExecutionEnvironment enableCheckpointing(long interval);
/**
* Enable checkpointing with interval and mode
* @param interval - checkpoint interval in milliseconds
* @param mode - checkpointing mode (EXACTLY_ONCE or AT_LEAST_ONCE)
*/
StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode);
/**
* Get checkpoint configuration for advanced settings
*/
CheckpointConfig getCheckpointConfig();Usage Examples:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 5 seconds
env.enableCheckpointing(5000);
// Enable with specific mode
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// Advanced configuration
CheckpointConfig config = env.getCheckpointConfig();
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoints(1);
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Fine-tune checkpoint behavior with various configuration options.
/**
* Set checkpointing mode
* @param checkpointingMode - EXACTLY_ONCE or AT_LEAST_ONCE
*/
void setCheckpointingMode(CheckpointingMode checkpointingMode);
/**
* Set minimum pause between checkpoints
* @param minPauseBetweenCheckpoints - minimum pause in milliseconds
*/
void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints);
/**
* Set checkpoint timeout
* @param checkpointTimeout - timeout in milliseconds
*/
void setCheckpointTimeout(long checkpointTimeout);
/**
* Set maximum concurrent checkpoints
* @param maxConcurrentCheckpoints - maximum number of concurrent checkpoints
*/
void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints);
/**
* Enable externalized checkpoints
* @param cleanupMode - cleanup mode for externalized checkpoints
*/
void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanupMode);
/**
* Set whether to fail on checkpointing errors
* @param failOnCheckpointingErrors - true to fail job on checkpoint errors
*/
void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors);
/**
* Set tolerable checkpoint failure number
* @param tolerableCheckpointFailureNumber - number of tolerable failures
*/
void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber);Usage Examples:
CheckpointConfig config = env.getCheckpointConfig();
// Set checkpointing mode
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Minimum pause between checkpoints (prevents too frequent checkpoints)
config.setMinPauseBetweenCheckpoints(500);
// Checkpoint timeout (checkpoint fails if not completed in time)
config.setCheckpointTimeout(60000);
// Maximum concurrent checkpoints (usually 1 for exactly-once)
config.setMaxConcurrentCheckpoints(1);
// Externalized checkpoints (persist checkpoints for recovery)
config.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// Continue job execution even if checkpoint fails
config.setFailOnCheckpointingErrors(false);
// Allow up to 3 consecutive checkpoint failures
config.setTolerableCheckpointFailureNumber(3);Implement stateful functions that participate in checkpointing.
/**
* Interface for functions that need to checkpoint state
*/
interface CheckpointedFunction {
/**
* Take a snapshot of the function's state
* @param context - snapshot context
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* Initialize or restore function state
* @param context - initialization context
*/
void initializeState(FunctionInitializationContext context) throws Exception;
}
/**
* Listener for checkpoint events
*/
interface CheckpointListener {
/**
* Notified when checkpoint is completed
* @param checkpointId - ID of completed checkpoint
*/
void notifyCheckpointComplete(long checkpointId) throws Exception;
/**
* Notified when checkpoint is aborted
* @param checkpointId - ID of aborted checkpoint
*/
default void notifyCheckpointAborted(long checkpointId) throws Exception {}
}Usage Examples:
public class StatefulMapFunction extends RichMapFunction<String, String>
implements CheckpointedFunction {
private ValueState<Integer> countState;
private ListState<String> bufferedElements;
// Transient state not included in checkpoints
private transient List<String> localBuffer;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Initialize state descriptors
ValueStateDescriptor<Integer> countDescriptor =
new ValueStateDescriptor<>("count", Integer.class);
countState = getRuntimeContext().getState(countDescriptor);
localBuffer = new ArrayList<>();
}
@Override
public String map(String value) throws Exception {
// Use state in processing
Integer count = countState.value();
if (count == null) count = 0;
countState.update(count + 1);
localBuffer.add(value);
return value + "_" + count;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Clear previous checkpoint data
bufferedElements.clear();
// Add current local buffer to checkpointed state
for (String element : localBuffer) {
bufferedElements.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Initialize checkpointed state
ListStateDescriptor<String> bufferDescriptor =
new ListStateDescriptor<>("bufferedElements", String.class);
bufferedElements = context.getOperatorState().getListState(bufferDescriptor);
// Restore local buffer from checkpointed state
localBuffer = new ArrayList<>();
if (context.isRestored()) {
for (String element : bufferedElements.get()) {
localBuffer.add(element);
}
}
}
}
// Usage with checkpoint listener
public class CheckpointAwareFunction extends RichMapFunction<String, String>
implements CheckpointedFunction, CheckpointListener {
private ValueState<Long> lastCheckpointId;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("lastCheckpointId", Long.class);
lastCheckpointId = context.getKeyedState().getState(descriptor);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
lastCheckpointId.update(context.getCheckpointId());
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// Perform cleanup or external system commits
System.out.println("Checkpoint " + checkpointId + " completed successfully");
}
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
}Configure different state backends for storing checkpointed state.
// Configure state backend (typically done via configuration)
// MemoryStateBackend - for development/testing
// FsStateBackend - for production with file system storage
// RocksDBStateBackend - for large state
// Configuration in flink-conf.yaml:
// state.backend: filesystem
// state.checkpoints.dir: hdfs://namenode:40010/flink/checkpointsUsage Examples:
// Configure state backend programmatically (not recommended for production)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Memory state backend (for testing only)
env.setStateBackend(new MemoryStateBackend(10 * 1024 * 1024)); // 10MB
// File system state backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// RocksDB state backend (for large state)
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));Configure how jobs should restart after failures.
// Configure restart strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay between attempts
));
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.milliseconds(1), // initial delay
Time.milliseconds(1000), // max delay
1.5, // backoff multiplier
Time.minutes(5), // reset time
0.1 // jitter
));
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per interval
Time.of(5, TimeUnit.MINUTES), // failure rate interval
Time.of(10, TimeUnit.SECONDS) // delay between attempts
));// Checkpointing mode
enum CheckpointingMode {
EXACTLY_ONCE, // Exactly-once processing guarantees
AT_LEAST_ONCE // At-least-once processing guarantees
}
// Externalized checkpoint cleanup
enum ExternalizedCheckpointCleanup {
RETAIN_ON_CANCELLATION, // Keep checkpoints when job is cancelled
DELETE_ON_CANCELLATION // Delete checkpoints when job is cancelled
}
// Checkpoint configuration
class CheckpointConfig {
void setCheckpointingMode(CheckpointingMode mode);
void setMinPauseBetweenCheckpoints(long minPause);
void setCheckpointTimeout(long timeout);
void setMaxConcurrentCheckpoints(int maxConcurrent);
void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup);
void setFailOnCheckpointingErrors(boolean failOnErrors);
void setTolerableCheckpointFailureNumber(int tolerableFailures);
}// Checkpointed function interface
interface CheckpointedFunction {
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
}
// Checkpoint listener interface
interface CheckpointListener {
void notifyCheckpointComplete(long checkpointId) throws Exception;
default void notifyCheckpointAborted(long checkpointId) throws Exception {}
}
// Function snapshot context
interface FunctionSnapshotContext {
long getCheckpointId();
long getCheckpointTimestamp();
}
// Function initialization context
interface FunctionInitializationContext {
boolean isRestored();
OperatorStateStore getOperatorState();
KeyedStateStore getKeyedState();
}// Operator state store for non-keyed state
interface OperatorStateStore {
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;
}
// Keyed state store for keyed state
interface KeyedStateStore {
<T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;
<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;
}// Restart strategies
class RestartStrategies {
static RestartStrategyConfiguration noRestart();
static RestartStrategyConfiguration fallBackRestart();
static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayBetweenAttempts);
static RestartStrategyConfiguration exponentialDelayRestart(Time initialBackoff, Time maxBackoff, double backoffMultiplier, Time resetBackoffThreshold, double jitter);
static RestartStrategyConfiguration failureRateRestart(int failureRate, Time failureInterval, Time delayInterval);
}
// Time utility for restart strategies
class Time {
static Time of(long size, TimeUnit unit);
static Time milliseconds(long milliseconds);
static Time seconds(long seconds);
static Time minutes(long minutes);
static Time hours(long hours);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11