CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-state-processor-api-2-12

Apache Flink State Processor API for reading and writing savepoint state data offline

Pending
Overview
Eval results
Files

state-writing.mddocs/

State Writing

State writing functionality allows you to bootstrap new state data into savepoints using DataSet transformations. This enables you to initialize operator state from batch data sources.

Bootstrap Transformation Overview

Bootstrap transformations define how to convert DataSet elements into operator state. They are created through the OperatorTransformation API and can be used with both keyed and non-keyed operators.

Creating Bootstrap Transformations

public final class OperatorTransformation {
    public static <T> OneInputOperatorTransformation<T> bootstrapWith(
        DataSet<T> dataSet
    );
}

Non-Keyed State Bootstrap

OneInputOperatorTransformation

For operators without keyed state partitioning.

public class OneInputOperatorTransformation<T> {
    public OneInputOperatorTransformation<T> setMaxParallelism(int maxParallelism);
    
    public OneInputOperatorTransformation<T> assignTimestamps(Timestamper<T> timestamper);
    
    public OneInputOperatorTransformation<T> assignTimestamps(TimestampAssigner<T> assigner);
    
    public BootstrapTransformation<T> transform(StateBootstrapFunction<T> processFunction);
    
    public BootstrapTransformation<T> transform(
        BroadcastStateBootstrapFunction<T> processFunction
    );
    
    public BootstrapTransformation<T> transform(SavepointWriterOperatorFactory factory);
    
    public <K> KeyedOperatorTransformation<K, T> keyBy(
        KeySelector<T, K> keySelector
    );
    
    public <K> KeyedOperatorTransformation<K, T> keyBy(
        KeySelector<T, K> keySelector, 
        TypeInformation<K> keyType
    );
    
    public KeyedOperatorTransformation<Tuple, T> keyBy(int... fields);
    
    public KeyedOperatorTransformation<Tuple, T> keyBy(String... fields);
}

Usage Example:

import org.apache.flink.state.api.functions.StateBootstrapFunction;

// Prepare bootstrap data
DataSet<String> messages = env.fromCollection(Arrays.asList(
    "message1", "message2", "message3"
));

// Create bootstrap transformation
BootstrapTransformation<String> transformation = OperatorTransformation
    .bootstrapWith(messages)
    .transform(new MessageBootstrapFunction());

// Add to savepoint
savepoint.withOperator("message-buffer", transformation);

StateBootstrapFunction

Base function for non-keyed state bootstrap.

public abstract class StateBootstrapFunction<T> extends AbstractRichFunction {
    public abstract void processElement(T value, Context ctx) throws Exception;
    
    public abstract class Context {
        public abstract long currentProcessingTime();
        public abstract long currentWatermark();
    }
}

Implementation Example:

public class MessageBootstrapFunction extends StateBootstrapFunction<String> {
    private ListState<String> messagesState;
    private ValueState<Integer> countState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // Register state descriptors
        ListStateDescriptor<String> messagesDesc = new ListStateDescriptor<>(
            "messages", String.class
        );
        messagesState = getRuntimeContext().getListState(messagesDesc);
        
        ValueStateDescriptor<Integer> countDesc = new ValueStateDescriptor<>(
            "count", Integer.class
        );
        countState = getRuntimeContext().getState(countDesc);
    }
    
    @Override
    public void processElement(String message, Context ctx) throws Exception {
        // Add message to list state
        messagesState.add(message);
        
        // Update count
        Integer currentCount = countState.value();
        countState.update((currentCount != null ? currentCount : 0) + 1);
        
        // Access context information
        long processingTime = ctx.currentProcessingTime();
        long watermark = ctx.currentWatermark();
        
        System.out.println("Processed message at " + processingTime + 
                          ", watermark: " + watermark);
    }
}

Keyed State Bootstrap

KeyedOperatorTransformation

For operators with keyed state partitioning.

public class KeyedOperatorTransformation<K, T> {
    public BootstrapTransformation<T> transform(
        KeyedStateBootstrapFunction<K, T> processFunction
    );
    
    public BootstrapTransformation<T> transform(SavepointWriterOperatorFactory factory);
    
    public <W extends Window> WindowedOperatorTransformation<T, K, W> window(
        WindowAssigner<? super T, W> assigner
    );
}

KeyedStateBootstrapFunction

Function for bootstrapping keyed state.

public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
    public abstract void processElement(IN value, Context ctx) throws Exception;
    
    public abstract class Context {
        public abstract TimerService timerService();
        public abstract K getCurrentKey();
    }
}

Usage Example:

// Prepare keyed bootstrap data
DataSet<UserAction> userActions = env.fromCollection(getUserActions());

// Create keyed bootstrap transformation
BootstrapTransformation<UserAction> transformation = OperatorTransformation
    .bootstrapWith(userActions)
    .keyBy(UserAction::getUserId)  // Key by user ID
    .transform(new UserStateBootstrapFunction());

// Add to savepoint
savepoint.withOperator("user-processor", transformation);

KeyedStateBootstrapFunction Implementation:

public class UserStateBootstrapFunction extends KeyedStateBootstrapFunction<String, UserAction> {
    private ValueState<UserProfile> profileState;
    private ListState<String> activityState;
    private ValueState<Long> lastSeenState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // Register keyed state descriptors
        ValueStateDescriptor<UserProfile> profileDesc = new ValueStateDescriptor<>(
            "profile", UserProfile.class
        );
        profileState = getRuntimeContext().getState(profileDesc);
        
        ListStateDescriptor<String> activityDesc = new ListStateDescriptor<>(
            "activity", String.class
        );
        activityState = getRuntimeContext().getListState(activityDesc);
        
        ValueStateDescriptor<Long> lastSeenDesc = new ValueStateDescriptor<>(
            "lastSeen", Long.class
        );
        lastSeenState = getRuntimeContext().getState(lastSeenDesc);
    }
    
    @Override
    public void processElement(UserAction action, Context ctx) throws Exception {
        String userId = ctx.getCurrentKey();
        
        // Update user profile
        UserProfile profile = profileState.value();
        if (profile == null) {
            profile = new UserProfile(userId);
        }
        profile.updateFromAction(action);
        profileState.update(profile);
        
        // Add to activity log
        activityState.add(action.getActionType() + ":" + action.getTimestamp());
        
        // Update last seen timestamp
        lastSeenState.update(action.getTimestamp());
        
        // Set timer for cleanup (example)
        TimerService timerService = ctx.timerService();
        timerService.registerEventTimeTimer(action.getTimestamp() + Duration.ofDays(30).toMillis());
    }
}

Broadcast State Bootstrap

BroadcastStateBootstrapFunction

Function for bootstrapping broadcast state.

public abstract class BroadcastStateBootstrapFunction<T> extends AbstractRichFunction {
    public abstract void processElement(T value, Context ctx) throws Exception;
    
    public abstract class Context {
        public abstract long currentProcessingTime();
        public abstract long currentWatermark();
    }
}

Usage Example:

// Bootstrap broadcast state with configuration data
DataSet<ConfigEntry> configs = env.fromCollection(getConfigEntries());

BootstrapTransformation<ConfigEntry> transformation = OperatorTransformation
    .bootstrapWith(configs)
    .keyBy(ConfigEntry::getKey)  // Key by config key
    .transform(new ConfigBroadcastBootstrapFunction());

savepoint.withOperator("config-broadcaster", transformation);

Implementation:

public class ConfigBroadcastBootstrapFunction extends BroadcastStateBootstrapFunction<ConfigEntry> {
    private MapStateDescriptor<String, String> configDescriptor;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        configDescriptor = new MapStateDescriptor<>(
            "config-broadcast", String.class, String.class
        );
    }
    
    @Override
    public void processElement(ConfigEntry entry, Context ctx) throws Exception {
        BroadcastState<String, String> broadcastState = 
            getRuntimeContext().getBroadcastState(configDescriptor);
        
        broadcastState.put(entry.getKey(), entry.getValue());
        
        System.out.println("Bootstrapped config: " + entry.getKey() + 
                          " = " + entry.getValue());
    }
}

Advanced Bootstrap Patterns

Conditional State Bootstrap

public class ConditionalBootstrapFunction extends KeyedStateBootstrapFunction<String, DataRecord> {
    private ValueState<String> statusState;
    private ValueState<Double> scoreState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        statusState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("status", String.class)
        );
        scoreState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("score", Double.class)
        );
    }
    
    @Override
    public void processElement(DataRecord record, Context ctx) throws Exception {
        // Only bootstrap state for certain conditions
        if (record.isValid() && record.getScore() > 0.5) {
            statusState.update(record.getStatus());
            scoreState.update(record.getScore());
            
            // Set cleanup timer based on record type
            if (record.getType().equals("TEMPORARY")) {
                ctx.timerService().registerEventTimeTimer(
                    record.getTimestamp() + Duration.ofHours(24).toMillis()
                );
            }
        }
    }
}

Batch Processing Bootstrap

public class BatchBootstrapFunction extends KeyedStateBootstrapFunction<String, BatchData> {
    private ListState<String> batchState;
    private ValueState<Integer> batchSizeState;
    
    @Override
    public void processElement(BatchData data, Context ctx) throws Exception {
        // Accumulate batch data
        batchState.add(data.getPayload());
        
        Integer currentSize = batchSizeState.value();
        int newSize = (currentSize != null ? currentSize : 0) + 1;
        batchSizeState.update(newSize);
        
        // Process batch when it reaches target size
        if (newSize >= data.getBatchSize()) {
            processBatch();
            batchState.clear();
            batchSizeState.clear();
        }
    }
    
    private void processBatch() throws Exception {
        List<String> items = new ArrayList<>();
        batchState.get().forEach(items::add);
        
        // Process accumulated batch
        System.out.println("Processing batch of " + items.size() + " items");
    }
}

Integration with Transformations

Chaining Transformations

// Create multiple bootstrap transformations
DataSet<UserData> userData = env.fromCollection(users);
DataSet<ConfigData> configData = env.fromCollection(configs);

BootstrapTransformation<UserData> userTransform = OperatorTransformation
    .bootstrapWith(userData)
    .keyBy(UserData::getId)
    .transform(new UserBootstrapFunction());

BootstrapTransformation<ConfigData> configTransform = OperatorTransformation
    .bootstrapWith(configData)
    .transform(new ConfigBootstrapFunction());

// Add multiple operators to savepoint
savepoint.withOperator("user-state", userTransform)
         .withOperator("config-state", configTransform)
         .write("/path/to/savepoint");

Using Custom Partitioners

// Custom key selector with type information
KeySelector<MyData, String> keySelector = data -> data.getPartitionKey();
TypeInformation<String> keyType = Types.STRING;

BootstrapTransformation<MyData> transformation = OperatorTransformation
    .bootstrapWith(dataSet)
    .keyBy(keySelector, keyType)  // Explicit key type
    .transform(new MyBootstrapFunction());

Configuration and Optimization

Setting Max Parallelism

// Set operator-specific max parallelism
BootstrapTransformation<MyData> transformation = OperatorTransformation
    .bootstrapWith(dataSet)
    .keyBy(MyData::getKey)
    .transform(new MyBootstrapFunction())
    .setMaxParallelism(256);  // Override global max parallelism

Memory and Performance Tuning

// Configure memory for bootstrap operations
Configuration config = new Configuration();
config.setString("taskmanager.memory.process.size", "2g");
config.setDouble("taskmanager.memory.flink.fraction", 0.8);

savepoint.withConfiguration(config)
         .withOperator("data-processor", transformation)
         .write("/path/to/savepoint");

Error Handling

Bootstrap Function Error Handling

public class RobustBootstrapFunction extends KeyedStateBootstrapFunction<String, MyData> {
    private ValueState<MyData> dataState;
    
    @Override
    public void processElement(MyData data, Context ctx) throws Exception {
        try {
            // Validate input data
            if (data == null || data.getId() == null) {
                System.err.println("Invalid data received, skipping");
                return;
            }
            
            // Process data
            dataState.update(data);
            
        } catch (Exception e) {
            System.err.println("Failed to process data for key " + 
                              ctx.getCurrentKey() + ": " + e.getMessage());
            // Could choose to re-throw or continue processing
            // throw new RuntimeException("Bootstrap failed", e);
        }
    }
}

Transformation Validation

try {
    BootstrapTransformation<MyData> transformation = OperatorTransformation
        .bootstrapWith(dataSet)
        .keyBy(MyData::getKey)
        .transform(new MyBootstrapFunction());
    
    savepoint.withOperator("my-operator", transformation);
    
} catch (InvalidProgramException e) {
    System.err.println("Type inference failed: " + e.getMessage());
} catch (RuntimeException e) {
    System.err.println("Transformation validation failed: " + e.getMessage());
}

Windowed State Bootstrap

WindowedOperatorTransformation

For windowed operators that process data within time or count-based windows.

public class WindowedOperatorTransformation<T, K, W extends Window> {
    public WindowedOperatorTransformation<T, K, W> trigger(
        Trigger<? super T, ? super W> trigger
    );
    
    public WindowedOperatorTransformation<T, K, W> evictor(
        Evictor<? super T, ? super W> evictor
    );
    
    public BootstrapTransformation<T> reduce(ReduceFunction<T> function);
    
    public <R> BootstrapTransformation<T> reduce(
        ReduceFunction<T> reduceFunction,
        WindowFunction<T, R, K, W> windowFunction
    );
    
    public <ACC, R> BootstrapTransformation<T> aggregate(
        AggregateFunction<T, ACC, R> function
    );
    
    public <ACC, V, R> BootstrapTransformation<T> aggregate(
        AggregateFunction<T, ACC, V> aggregateFunction,
        WindowFunction<V, R, K, W> windowFunction
    );
    
    public <R> BootstrapTransformation<T> apply(
        WindowFunction<T, R, K, W> function
    );
    
    public <R> BootstrapTransformation<T> process(
        ProcessWindowFunction<T, R, K, W> function
    );
}

Usage Example:

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

// Create windowed bootstrap transformation
DataSet<SensorReading> sensorData = env.fromCollection(getSensorReadings());

BootstrapTransformation<SensorReading> windowTransformation = OperatorTransformation
    .bootstrapWith(sensorData)
    .keyBy(SensorReading::getSensorId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new AverageAggregateFunction());

savepoint.withOperator("sensor-window-operator", windowTransformation);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12

docs

function-interfaces.md

index.md

savepoint-management.md

state-reading.md

state-writing.md

window-operations.md

tile.json