CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-java-2-11

Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities

Pending
Overview
Eval results
Files

checkpointing.mddocs/

Checkpointing and Fault Tolerance

Apache Flink provides fault tolerance through checkpointing, which creates consistent snapshots of application state. This enables exactly-once processing guarantees and recovery from failures.

Capabilities

Checkpoint Configuration

Configure checkpointing behavior and fault tolerance settings.

/**
 * Enable checkpointing with specified interval
 * @param interval - checkpoint interval in milliseconds
 */
StreamExecutionEnvironment enableCheckpointing(long interval);

/**
 * Enable checkpointing with interval and mode
 * @param interval - checkpoint interval in milliseconds
 * @param mode - checkpointing mode (EXACTLY_ONCE or AT_LEAST_ONCE)
 */
StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode);

/**
 * Get checkpoint configuration for advanced settings
 */
CheckpointConfig getCheckpointConfig();

Usage Examples:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 5 seconds
env.enableCheckpointing(5000);

// Enable with specific mode
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

// Advanced configuration
CheckpointConfig config = env.getCheckpointConfig();
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoints(1);
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Checkpoint Configuration Options

Fine-tune checkpoint behavior with various configuration options.

/**
 * Set checkpointing mode
 * @param checkpointingMode - EXACTLY_ONCE or AT_LEAST_ONCE
 */
void setCheckpointingMode(CheckpointingMode checkpointingMode);

/**
 * Set minimum pause between checkpoints
 * @param minPauseBetweenCheckpoints - minimum pause in milliseconds
 */
void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints);

/**
 * Set checkpoint timeout
 * @param checkpointTimeout - timeout in milliseconds
 */
void setCheckpointTimeout(long checkpointTimeout);

/**
 * Set maximum concurrent checkpoints
 * @param maxConcurrentCheckpoints - maximum number of concurrent checkpoints
 */
void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints);

/**
 * Enable externalized checkpoints
 * @param cleanupMode - cleanup mode for externalized checkpoints
 */
void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanupMode);

/**
 * Set whether to fail on checkpointing errors
 * @param failOnCheckpointingErrors - true to fail job on checkpoint errors
 */
void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors);

/**
 * Set tolerable checkpoint failure number
 * @param tolerableCheckpointFailureNumber - number of tolerable failures
 */
void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber);

Usage Examples:

CheckpointConfig config = env.getCheckpointConfig();

// Set checkpointing mode
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Minimum pause between checkpoints (prevents too frequent checkpoints)
config.setMinPauseBetweenCheckpoints(500);

// Checkpoint timeout (checkpoint fails if not completed in time)
config.setCheckpointTimeout(60000);

// Maximum concurrent checkpoints (usually 1 for exactly-once)
config.setMaxConcurrentCheckpoints(1);

// Externalized checkpoints (persist checkpoints for recovery)
config.enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// Continue job execution even if checkpoint fails
config.setFailOnCheckpointingErrors(false);

// Allow up to 3 consecutive checkpoint failures
config.setTolerableCheckpointFailureNumber(3);

Stateful Functions

Implement stateful functions that participate in checkpointing.

/**
 * Interface for functions that need to checkpoint state
 */
interface CheckpointedFunction {
    /**
     * Take a snapshot of the function's state
     * @param context - snapshot context
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;
    
    /**
     * Initialize or restore function state
     * @param context - initialization context
     */
    void initializeState(FunctionInitializationContext context) throws Exception;
}

/**
 * Listener for checkpoint events
 */
interface CheckpointListener {
    /**
     * Notified when checkpoint is completed
     * @param checkpointId - ID of completed checkpoint
     */
    void notifyCheckpointComplete(long checkpointId) throws Exception;
    
    /**
     * Notified when checkpoint is aborted
     * @param checkpointId - ID of aborted checkpoint
     */
    default void notifyCheckpointAborted(long checkpointId) throws Exception {}
}

Usage Examples:

public class StatefulMapFunction extends RichMapFunction<String, String> 
    implements CheckpointedFunction {
    
    private ValueState<Integer> countState;
    private ListState<String> bufferedElements;
    
    // Transient state not included in checkpoints
    private transient List<String> localBuffer;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Initialize state descriptors
        ValueStateDescriptor<Integer> countDescriptor = 
            new ValueStateDescriptor<>("count", Integer.class);
        countState = getRuntimeContext().getState(countDescriptor);
        
        localBuffer = new ArrayList<>();
    }

    @Override
    public String map(String value) throws Exception {
        // Use state in processing
        Integer count = countState.value();
        if (count == null) count = 0;
        
        countState.update(count + 1);
        localBuffer.add(value);
        
        return value + "_" + count;
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // Clear previous checkpoint data
        bufferedElements.clear();
        
        // Add current local buffer to checkpointed state
        for (String element : localBuffer) {
            bufferedElements.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // Initialize checkpointed state
        ListStateDescriptor<String> bufferDescriptor = 
            new ListStateDescriptor<>("bufferedElements", String.class);
        bufferedElements = context.getOperatorState().getListState(bufferDescriptor);
        
        // Restore local buffer from checkpointed state
        localBuffer = new ArrayList<>();
        if (context.isRestored()) {
            for (String element : bufferedElements.get()) {
                localBuffer.add(element);
            }
        }
    }
}

// Usage with checkpoint listener
public class CheckpointAwareFunction extends RichMapFunction<String, String> 
    implements CheckpointedFunction, CheckpointListener {
    
    private ValueState<Long> lastCheckpointId;

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ValueStateDescriptor<Long> descriptor = 
            new ValueStateDescriptor<>("lastCheckpointId", Long.class);
        lastCheckpointId = context.getKeyedState().getState(descriptor);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        lastCheckpointId.update(context.getCheckpointId());
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // Perform cleanup or external system commits
        System.out.println("Checkpoint " + checkpointId + " completed successfully");
    }

    @Override
    public String map(String value) throws Exception {
        return value.toUpperCase();
    }
}

State Backends

Configure different state backends for storing checkpointed state.

// Configure state backend (typically done via configuration)
// MemoryStateBackend - for development/testing
// FsStateBackend - for production with file system storage  
// RocksDBStateBackend - for large state

// Configuration in flink-conf.yaml:
// state.backend: filesystem
// state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

Usage Examples:

// Configure state backend programmatically (not recommended for production)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Memory state backend (for testing only)
env.setStateBackend(new MemoryStateBackend(10 * 1024 * 1024)); // 10MB

// File system state backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

// RocksDB state backend (for large state)
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

Restart Strategies

Configure how jobs should restart after failures.

// Configure restart strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3, // number of restart attempts  
    Time.of(10, TimeUnit.SECONDS) // delay between attempts
));

env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
    Time.milliseconds(1),     // initial delay
    Time.milliseconds(1000),  // max delay
    1.5,                      // backoff multiplier
    Time.minutes(5),          // reset time
    0.1                       // jitter
));

env.setRestartStrategy(RestartStrategies.failureRateRestart(
    3,                        // max failures per interval
    Time.of(5, TimeUnit.MINUTES), // failure rate interval
    Time.of(10, TimeUnit.SECONDS) // delay between attempts
));

Types

Checkpoint Configuration Types

// Checkpointing mode
enum CheckpointingMode {
    EXACTLY_ONCE,    // Exactly-once processing guarantees
    AT_LEAST_ONCE    // At-least-once processing guarantees
}

// Externalized checkpoint cleanup
enum ExternalizedCheckpointCleanup {
    RETAIN_ON_CANCELLATION,  // Keep checkpoints when job is cancelled
    DELETE_ON_CANCELLATION   // Delete checkpoints when job is cancelled
}

// Checkpoint configuration
class CheckpointConfig {
    void setCheckpointingMode(CheckpointingMode mode);
    void setMinPauseBetweenCheckpoints(long minPause);
    void setCheckpointTimeout(long timeout);
    void setMaxConcurrentCheckpoints(int maxConcurrent);
    void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup);
    void setFailOnCheckpointingErrors(boolean failOnErrors);
    void setTolerableCheckpointFailureNumber(int tolerableFailures);
}

Stateful Function Interfaces

// Checkpointed function interface
interface CheckpointedFunction {
    void snapshotState(FunctionSnapshotContext context) throws Exception;
    void initializeState(FunctionInitializationContext context) throws Exception;
}

// Checkpoint listener interface
interface CheckpointListener {
    void notifyCheckpointComplete(long checkpointId) throws Exception;
    default void notifyCheckpointAborted(long checkpointId) throws Exception {}
}

// Function snapshot context
interface FunctionSnapshotContext {
    long getCheckpointId();
    long getCheckpointTimestamp();
}

// Function initialization context
interface FunctionInitializationContext {
    boolean isRestored();
    OperatorStateStore getOperatorState();
    KeyedStateStore getKeyedState();
}

State Store Interfaces

// Operator state store for non-keyed state
interface OperatorStateStore {
    <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
    <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
    <S> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;
}

// Keyed state store for keyed state
interface KeyedStateStore {
    <T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;
    <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
    <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;
    <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;
    <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;
}

Restart Strategy Types

// Restart strategies
class RestartStrategies {
    static RestartStrategyConfiguration noRestart();
    static RestartStrategyConfiguration fallBackRestart();
    static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayBetweenAttempts);
    static RestartStrategyConfiguration exponentialDelayRestart(Time initialBackoff, Time maxBackoff, double backoffMultiplier, Time resetBackoffThreshold, double jitter);
    static RestartStrategyConfiguration failureRateRestart(int failureRate, Time failureInterval, Time delayInterval);
}

// Time utility for restart strategies
class Time {
    static Time of(long size, TimeUnit unit);
    static Time milliseconds(long milliseconds);
    static Time seconds(long seconds);
    static Time minutes(long minutes);
    static Time hours(long hours);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11

docs

async-io.md

checkpointing.md

datastream-transformations.md

execution-environment.md

index.md

keyed-streams-state.md

process-functions.md

sources-sinks.md

time-watermarks.md

windowing.md

tile.json