CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-state-processor-api-2-12

Apache Flink State Processor API for reading and writing savepoint state data offline

Pending
Overview
Eval results
Files

savepoint-management.mddocs/

Savepoint Management

Savepoint management provides the core functionality for loading existing savepoints and creating new ones. This includes the main entry points and base classes for savepoint operations.

Entry Point API

Savepoint Class

The main entry point for all savepoint operations.

public final class Savepoint {
    public static ExistingSavepoint load(
        ExecutionEnvironment env, 
        String path, 
        StateBackend stateBackend
    ) throws IOException;
    
    public static NewSavepoint create(
        StateBackend stateBackend, 
        int maxParallelism
    );
}

Loading Existing Savepoints

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new HashMapStateBackend();

// Load savepoint from file system
ExistingSavepoint savepoint = Savepoint.load(env, "/path/to/savepoint", stateBackend);

Creating New Savepoints

StateBackend stateBackend = new HashMapStateBackend();

// Create new empty savepoint with max parallelism of 128
NewSavepoint savepoint = Savepoint.create(stateBackend, 128);

Savepoint Types

ExistingSavepoint

Represents a savepoint loaded from disk that can be read from and modified.

public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
    // State reading methods
    public <T> DataSource<T> readListState(
        String uid, 
        String name, 
        TypeInformation<T> typeInfo
    ) throws IOException;
    
    public <T> DataSource<T> readListState(
        String uid, 
        String name, 
        TypeInformation<T> typeInfo, 
        TypeSerializer<T> serializer
    ) throws IOException;
    
    public <T> DataSource<T> readUnionState(
        String uid, 
        String name, 
        TypeInformation<T> typeInfo
    ) throws IOException;
    
    public <T> DataSource<T> readUnionState(
        String uid, 
        String name, 
        TypeInformation<T> typeInfo, 
        TypeSerializer<T> serializer
    ) throws IOException;
    
    public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
        String uid,
        String name,
        TypeInformation<K> keyTypeInfo,
        TypeInformation<V> valueTypeInfo
    ) throws IOException;
    
    public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
        String uid,
        String name,
        TypeInformation<K> keyTypeInfo,
        TypeInformation<V> valueTypeInfo,
        TypeSerializer<K> keySerializer,
        TypeSerializer<V> valueSerializer
    ) throws IOException;
    
    public <K, OUT> DataSource<OUT> readKeyedState(
        String uid, 
        KeyedStateReaderFunction<K, OUT> function
    ) throws IOException;
    
    public <K, OUT> DataSource<OUT> readKeyedState(
        String uid,
        KeyedStateReaderFunction<K, OUT> function,
        TypeInformation<K> keyTypeInfo,
        TypeInformation<OUT> outTypeInfo
    ) throws IOException;
    
    public <W extends Window> WindowReader<W> window(
        WindowAssigner<?, W> assigner
    );
    
    public <W extends Window> WindowReader<W> window(
        TypeSerializer<W> windowSerializer
    );
}

NewSavepoint

Represents a new empty savepoint that can be written to.

public class NewSavepoint extends WritableSavepoint<NewSavepoint> {
    // Inherits all methods from WritableSavepoint
}

WritableSavepoint

Base class for savepoints that can be modified and written.

public abstract class WritableSavepoint<F extends WritableSavepoint> {
    public F removeOperator(String uid);
    
    public <T> F withOperator(
        String uid, 
        BootstrapTransformation<T> transformation
    );
    
    public <T> F withConfiguration(
        ConfigOption<T> option, 
        T value
    );
    
    public void write(String path);
}

Usage Examples

Complete Savepoint Workflow

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.ExistingSavepoint;

public class SavepointExample {
    public void processSavepoint() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        StateBackend stateBackend = new HashMapStateBackend();
        
        // Load existing savepoint
        ExistingSavepoint existing = Savepoint.load(
            env, 
            "/path/to/input/savepoint", 
            stateBackend
        );
        
        // Read some state
        DataSource<MyState> currentState = existing.readListState(
            "operator-1", 
            "list-state", 
            TypeInformation.of(MyState.class)
        );
        
        // Create bootstrap transformation for new state
        DataSet<NewState> newData = env.fromCollection(getNewStateData());
        BootstrapTransformation<NewState> transformation = OperatorTransformation
            .bootstrapWith(newData)
            .keyBy(NewState::getKey)
            .transform(new MyBootstrapFunction());
        
        // Remove old operator and add new one
        existing.removeOperator("operator-1")
                .withOperator("operator-2", transformation)
                .write("/path/to/output/savepoint");
        
        env.execute("Savepoint Processing");
    }
}

Creating Savepoint from Scratch

public class NewSavepointExample {
    public void createSavepoint() throws Exception {
        StateBackend stateBackend = new HashMapStateBackend();
        
        // Create new savepoint
        NewSavepoint savepoint = Savepoint.create(stateBackend, 256);
        
        // Prepare bootstrap data
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<InitialData> bootstrapData = env.readTextFile("/path/to/data")
            .map(line -> parseInitialData(line));
        
        // Create transformation
        BootstrapTransformation<InitialData> transformation = OperatorTransformation
            .bootstrapWith(bootstrapData)
            .keyBy(InitialData::getPartitionKey)
            .transform(new InitialStateBootstrapFunction());
        
        // Add operator and write
        savepoint.withOperator("main-operator", transformation)
                 .write("/path/to/new/savepoint");
        
        env.execute("Create New Savepoint");
    }
}

Configuration Options

Max Parallelism

When creating new savepoints, the max parallelism must be specified:

  • Must be between 1 and 32768 (UPPER_BOUND_MAX_PARALLELISM)
  • Determines the maximum number of key groups for keyed state
  • Cannot be changed after savepoint creation
  • Should be set higher than expected parallelism for flexibility

State Backend Configuration

The state backend determines how state is stored and accessed:

// Use HashMapStateBackend for in-memory state
StateBackend hashMapBackend = new HashMapStateBackend();

// Use EmbeddedRocksDBStateBackend for disk-based state  
StateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend();

// Use custom configuration
Configuration config = new Configuration();
config.setString("state.backend.rocksdb.localdir", "/tmp/rocksdb");
StateBackend configuredBackend = new EmbeddedRocksDBStateBackend().configure(config);

Error Handling

Common exceptions and their meanings:

IOException

  • Savepoint path not found or inaccessible
  • File system permission issues
  • Corrupted savepoint files

RuntimeException

  • Invalid max parallelism values
  • Savepoint contains no operator states
  • Operator state validation failures

Example Error Handling

try {
    ExistingSavepoint savepoint = Savepoint.load(env, savepointPath, stateBackend);
    // Process savepoint...
} catch (IOException e) {
    if (e.getMessage().contains("does not exist")) {
        System.err.println("Savepoint path not found: " + savepointPath);
    } else {
        System.err.println("Failed to load savepoint: " + e.getMessage());
    }
} catch (RuntimeException e) {
    System.err.println("Savepoint validation failed: " + e.getMessage());
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12

docs

function-interfaces.md

index.md

savepoint-management.md

state-reading.md

state-writing.md

window-operations.md

tile.json