or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

function-interfaces.mdindex.mdsavepoint-management.mdstate-reading.mdstate-writing.mdwindow-operations.md
tile.json

tessl/maven-org-apache-flink--flink-state-processor-api-2-12

Apache Flink State Processor API for reading and writing savepoint state data offline

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-state-processor-api_2.12@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-state-processor-api-2-12@1.14.0

index.mddocs/

Apache Flink State Processor API

The Apache Flink State Processor API provides programmatic access to reading and writing Flink savepoint state data outside of a running Flink application. This API enables batch processing of streaming application state, allowing developers to bootstrap new savepoints with initial state data, query and analyze existing state, modify operator state, and perform state transformations using standard Flink DataSet operations.

Package Information

  • Package Name: org.apache.flink:flink-state-processor-api_2.12
  • Package Type: maven
  • Language: Java
  • Installation: Add dependency in pom.xml:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-state-processor-api_2.12</artifactId>
    <version>1.14.6</version>
</dependency>

Core Imports

import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.NewSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.state.StateBackend;

Basic Usage

Loading and Reading an Existing Savepoint

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.ExistingSavepoint;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new HashMapStateBackend();

// Load existing savepoint
ExistingSavepoint savepoint = Savepoint.load(env, "/path/to/savepoint", stateBackend);

// Read operator state
DataSource<MyState> states = savepoint.readListState(
    "my-operator-uid", 
    "my-state-name", 
    TypeInformation.of(MyState.class)
);

states.print();
env.execute();

Creating a New Savepoint with Bootstrap Data

import org.apache.flink.state.api.NewSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;

// Create new savepoint
NewSavepoint savepoint = Savepoint.create(stateBackend, 128);

// Bootstrap data from DataSet
DataSet<MyInput> inputData = env.fromCollection(myDataList);

BootstrapTransformation<MyInput> transformation = OperatorTransformation
    .bootstrapWith(inputData)
    .keyBy(input -> input.getKey())
    .transform(new MyKeyedStateBootstrapFunction());

// Add operator and write savepoint
savepoint.withOperator("my-operator-uid", transformation)
         .write("/path/to/new-savepoint");

Architecture

The State Processor API consists of several key components:

  • Savepoint Management: Entry points for loading existing and creating new savepoints
  • State Reading: APIs for reading different types of state (keyed, list, union, broadcast, window)
  • State Writing: Bootstrap transformations for writing new state data
  • Function Interfaces: User-defined functions for processing state data
  • Type System: Integration with Flink's type system for serialization

Capabilities

Savepoint Management

Core functionality for loading, creating, and managing savepoints.

// Entry point class
public final class Savepoint {
    public static ExistingSavepoint load(
        ExecutionEnvironment env, 
        String path, 
        StateBackend stateBackend
    ) throws IOException;
    
    public static NewSavepoint create(
        StateBackend stateBackend, 
        int maxParallelism
    );
}

Savepoint Management

State Reading

Read various types of state from existing savepoints including keyed state, operator state, and window state.

// Reading different state types
public <T> DataSource<T> readListState(
    String uid, 
    String name, 
    TypeInformation<T> typeInfo
) throws IOException;

public <K, OUT> DataSource<OUT> readKeyedState(
    String uid, 
    KeyedStateReaderFunction<K, OUT> function
) throws IOException;

public <W extends Window> WindowReader<W> window(
    WindowAssigner<?, W> assigner
);

State Reading

State Writing

Bootstrap new state data into savepoints using DataSet transformations.

// Bootstrap transformation creation
public static <T> OneInputOperatorTransformation<T> bootstrapWith(
    DataSet<T> dataSet
);

// Keyed state bootstrap
public BootstrapTransformation<T> transform(
    KeyedStateBootstrapFunction<K, T> processFunction
);

State Writing

Function Interfaces

User-defined functions for reading and writing state data.

// Base bootstrap function
public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
    public abstract void processElement(IN value, Context ctx) throws Exception;
}

// Base reader function  
public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
    public abstract void readKey(K key, Context ctx, Collector<OUT> out) throws Exception;
}

Function Interfaces

Window State Operations

Specialized operations for reading and writing window state data.

// Window reader for different aggregation types
public <T, K> DataSource<T> reduce(
    String uid,
    ReduceFunction<T> function,
    TypeInformation<K> keyType,
    TypeInformation<T> reduceType
) throws IOException;

public <K, T, ACC, R> DataSource<R> aggregate(
    String uid,
    AggregateFunction<T, ACC, R> aggregateFunction,
    TypeInformation<K> keyType,
    TypeInformation<ACC> accType,
    TypeInformation<R> outputType
) throws IOException;

Window Operations

Common Types

// Core context interfaces
public abstract class Context {
    public abstract TimerService timerService();
    public abstract K getCurrentKey();
}

// Bootstrap transformation
public class BootstrapTransformation<T> {
    public DataSet<OperatorState> writeOperatorState(
        OperatorID operatorID,
        StateBackend stateBackend,
        Configuration config,
        int globalMaxParallelism,
        Path savepointPath
    );
}

// Writable savepoint base
public abstract class WritableSavepoint<F extends WritableSavepoint> {
    public F removeOperator(String uid);
    public <T> F withOperator(String uid, BootstrapTransformation<T> transformation);
    public <T> F withConfiguration(ConfigOption<T> option, T value);
    public void write(String path);
}

Error Handling

The API throws standard Java exceptions:

  • IOException - For savepoint path and file system operations
  • InvalidProgramException - For type inference failures
  • RuntimeException - For validation and state access errors

Common error scenarios:

  • Savepoint path not found or inaccessible
  • Operator UID not found in savepoint
  • Type serialization/deserialization issues
  • Invalid max parallelism values
  • State descriptor registration errors