CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-state-processor-api-2-12

Apache Flink State Processor API for reading and writing savepoint state data offline

Pending
Overview
Eval results
Files

state-reading.mddocs/

State Reading

State reading functionality allows you to extract and process state data from existing savepoints. The API supports reading different types of Flink state including operator state (list, union, broadcast) and keyed state.

Operator State Reading

List State

Read operator list state as individual elements.

public <T> DataSource<T> readListState(
    String uid, 
    String name, 
    TypeInformation<T> typeInfo
) throws IOException;

public <T> DataSource<T> readListState(
    String uid, 
    String name, 
    TypeInformation<T> typeInfo, 
    TypeSerializer<T> serializer
) throws IOException;

Usage Example:

// Read list state with type information
DataSource<String> messages = savepoint.readListState(
    "kafka-source", 
    "buffered-messages", 
    Types.STRING
);

// Read with custom serializer
TypeSerializer<MyCustomType> customSerializer = new MyCustomTypeSerializer();
DataSource<MyCustomType> customData = savepoint.readListState(
    "custom-operator",
    "custom-state",
    TypeInformation.of(MyCustomType.class),
    customSerializer
);

// Process the data
messages.map(msg -> "Processed: " + msg).print();

Union State

Read operator union state which is similar to list state but with different semantics for redistribution.

public <T> DataSource<T> readUnionState(
    String uid, 
    String name, 
    TypeInformation<T> typeInfo
) throws IOException;

public <T> DataSource<T> readUnionState(
    String uid, 
    String name, 
    TypeInformation<T> typeInfo, 
    TypeSerializer<T> serializer
) throws IOException;

Usage Example:

// Read union state
DataSource<Configuration> configs = savepoint.readUnionState(
    "config-broadcaster",
    "broadcast-config",
    TypeInformation.of(Configuration.class)
);

configs.map(config -> processConfiguration(config)).collect();

Broadcast State

Read broadcast state as key-value pairs.

public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
    String uid,
    String name,
    TypeInformation<K> keyTypeInfo,
    TypeInformation<V> valueTypeInfo
) throws IOException;

public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
    String uid,
    String name,
    TypeInformation<K> keyTypeInfo,
    TypeInformation<V> valueTypeInfo,
    TypeSerializer<K> keySerializer,
    TypeSerializer<V> valueSerializer
) throws IOException;

Usage Example:

// Read broadcast state as key-value pairs
DataSource<Tuple2<String, Rule>> rules = savepoint.readBroadcastState(
    "rule-processor",
    "rules-broadcast-state",
    Types.STRING,
    TypeInformation.of(Rule.class)
);

// Process the rules
rules.map(tuple -> "Rule " + tuple.f0 + ": " + tuple.f1.getDescription())
     .print();

Keyed State Reading

Read keyed state using custom reader functions that process each key individually.

public <K, OUT> DataSource<OUT> readKeyedState(
    String uid, 
    KeyedStateReaderFunction<K, OUT> function
) throws IOException;

public <K, OUT> DataSource<OUT> readKeyedState(
    String uid,
    KeyedStateReaderFunction<K, OUT> function,
    TypeInformation<K> keyTypeInfo,
    TypeInformation<OUT> outTypeInfo
) throws IOException;

KeyedStateReaderFunction Interface

public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
    public abstract void open(Configuration parameters) throws Exception;
    
    public abstract void readKey(
        K key, 
        Context ctx, 
        Collector<OUT> out
    ) throws Exception;
    
    public interface Context {
        Set<Long> registeredEventTimeTimers() throws Exception;
        Set<Long> registeredProcessingTimeTimers() throws Exception;
    }
}

Usage Example:

public class MyKeyedStateReader extends KeyedStateReaderFunction<String, UserStats> {
    private ValueState<Long> countState;
    private ValueState<Double> avgState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // Register state descriptors in open()
        ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>(
            "count", Long.class
        );
        countState = getRuntimeContext().getState(countDesc);
        
        ValueStateDescriptor<Double> avgDesc = new ValueStateDescriptor<>(
            "average", Double.class  
        );
        avgState = getRuntimeContext().getState(avgDesc);
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<UserStats> out) throws Exception {
        Long count = countState.value();
        Double average = avgState.value();
        
        if (count != null && average != null) {
            UserStats stats = new UserStats(key, count, average);
            out.collect(stats);
        }
        
        // Access timer information if needed
        Set<Long> eventTimers = ctx.registeredEventTimeTimers();
        Set<Long> processingTimers = ctx.registeredProcessingTimeTimers();
    }
}

// Use the reader function
DataSource<UserStats> userStats = savepoint.readKeyedState(
    "user-processor",
    new MyKeyedStateReader()
);

userStats.print();

Reading Different Keyed State Types

Value State:

public class ValueStateReader extends KeyedStateReaderFunction<String, Tuple2<String, String>> {
    private ValueState<String> valueState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("value", String.class);
        valueState = getRuntimeContext().getState(desc);
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
        String value = valueState.value();
        if (value != null) {
            out.collect(Tuple2.of(key, value));
        }
    }
}

List State:

public class ListStateReader extends KeyedStateReaderFunction<String, Tuple2<String, List<String>>> {
    private ListState<String> listState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<String> desc = new ListStateDescriptor<>("list", String.class);
        listState = getRuntimeContext().getListState(desc);
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
        List<String> items = new ArrayList<>();
        for (String item : listState.get()) {
            items.add(item);
        }
        if (!items.isEmpty()) {
            out.collect(Tuple2.of(key, items));
        }
    }
}

Map State:

public class MapStateReader extends KeyedStateReaderFunction<String, Tuple3<String, String, Integer>> {
    private MapState<String, Integer> mapState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, Integer> desc = new MapStateDescriptor<>(
            "map", String.class, Integer.class
        );
        mapState = getRuntimeContext().getMapState(desc);
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<Tuple3<String, String, Integer>> out) throws Exception {
        for (Map.Entry<String, Integer> entry : mapState.entries()) {
            out.collect(Tuple3.of(key, entry.getKey(), entry.getValue()));
        }
    }
}

Advanced Reading Patterns

Filtering State Data

public class FilteringStateReader extends KeyedStateReaderFunction<String, FilteredData> {
    private ValueState<MyData> dataState;
    private final Predicate<MyData> filter;
    
    public FilteringStateReader(Predicate<MyData> filter) {
        this.filter = filter;
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<MyData> desc = new ValueStateDescriptor<>("data", MyData.class);
        dataState = getRuntimeContext().getState(desc);
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<FilteredData> out) throws Exception {
        MyData data = dataState.value();
        if (data != null && filter.test(data)) {
            out.collect(new FilteredData(key, data));
        }
    }
}

// Usage
DataSource<FilteredData> filtered = savepoint.readKeyedState(
    "data-processor",
    new FilteringStateReader(data -> data.getScore() > 0.8)
);

Aggregating State Across Keys

public class AggregatingStateReader extends KeyedStateReaderFunction<String, KeyAggregate> {
    private ValueState<Double> valueState;
    private ListState<String> tagState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        valueState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("value", Double.class)
        );
        tagState = getRuntimeContext().getListState(
            new ListStateDescriptor<>("tags", String.class)
        );
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<KeyAggregate> out) throws Exception {
        Double value = valueState.value();
        List<String> tags = new ArrayList<>();
        tagState.get().forEach(tags::add);
        
        if (value != null) {
            KeyAggregate aggregate = new KeyAggregate(key, value, tags);
            out.collect(aggregate);
        }
    }
}

Error Handling

Common Reading Errors

try {
    DataSource<MyData> data = savepoint.readListState(
        "operator-uid",
        "state-name", 
        TypeInformation.of(MyData.class)
    );
    
    data.collect();
    
} catch (IOException e) {
    if (e.getMessage().contains("does not exist")) {
        System.err.println("Operator UID not found in savepoint");
    } else if (e.getMessage().contains("state")) {
        System.err.println("State descriptor not found");
    } else {
        System.err.println("Failed to read state: " + e.getMessage());
    }
}

State Descriptor Registration Errors

public class SafeStateReader extends KeyedStateReaderFunction<String, MyOutput> {
    private ValueState<String> state;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("myState", String.class);
            state = getRuntimeContext().getState(desc);
        } catch (Exception e) {
            throw new RuntimeException("Failed to register state descriptor", e);
        }
    }
    
    @Override
    public void readKey(String key, Context ctx, Collector<MyOutput> out) throws Exception {
        try {
            String value = state.value();
            if (value != null) {
                out.collect(new MyOutput(key, value));
            }
        } catch (Exception e) {
            System.err.println("Failed to read state for key " + key + ": " + e.getMessage());
            // Could collect error record or skip
        }
    }
}

Performance Considerations

Parallelism Settings

// Set appropriate parallelism for reading operations
env.setParallelism(8);

DataSource<MyData> data = savepoint.readKeyedState("operator", readerFunction);

// Can override parallelism for specific operations  
data.setParallelism(4).map(processData).print();

Memory Management

// For large state, consider processing in batches
public class BatchingStateReader extends KeyedStateReaderFunction<String, List<MyData>> {
    private ListState<MyData> listState;
    
    @Override
    public void readKey(String key, Context ctx, Collector<List<MyData>> out) throws Exception {
        List<MyData> batch = new ArrayList<>();
        int batchSize = 0;
        
        for (MyData item : listState.get()) {
            batch.add(item);
            batchSize++;
            
            // Emit batch when it reaches size limit
            if (batchSize >= 1000) {
                out.collect(new ArrayList<>(batch));
                batch.clear();
                batchSize = 0;
            }
        }
        
        // Emit remaining items
        if (!batch.isEmpty()) {
            out.collect(batch);
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12

docs

function-interfaces.md

index.md

savepoint-management.md

state-reading.md

state-writing.md

window-operations.md

tile.json