Apache Flink State Processor API for reading and writing savepoint state data offline
—
Function interfaces define the contracts for processing state data in the State Processor API. These interfaces allow you to implement custom logic for reading from and writing to Flink savepoints.
Base function for bootstrapping non-keyed operator state.
public abstract class StateBootstrapFunction<IN> extends AbstractRichFunction
implements CheckpointedFunction {
public abstract void processElement(IN value, Context ctx) throws Exception;
public interface Context {
long currentProcessingTime();
}
}Lifecycle:
open(Configuration) - Initialize state descriptors and resourcesprocessElement(T, Context) - Process each input elementclose() - Cleanup resourcesUsage Example:
public class MetricsBootstrapFunction extends StateBootstrapFunction<MetricEvent> {
private ValueState<Long> totalCountState;
private ValueState<Double> averageState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Register state descriptors
totalCountState = getRuntimeContext().getState(
new ValueStateDescriptor<>("totalCount", Long.class)
);
averageState = getRuntimeContext().getState(
new ValueStateDescriptor<>("average", Double.class)
);
}
@Override
public void processElement(MetricEvent event, Context ctx) throws Exception {
// Update total count
Long currentCount = totalCountState.value();
totalCountState.update((currentCount != null ? currentCount : 0L) + 1);
// Update running average
Double currentAvg = averageState.value();
double newAvg = calculateRunningAverage(currentAvg, event.getValue());
averageState.update(newAvg);
// Access context information
System.out.println("Processing time: " + ctx.currentProcessingTime());
System.out.println("Watermark: " + ctx.currentWatermark());
}
private double calculateRunningAverage(Double currentAvg, double newValue) {
return currentAvg != null ? (currentAvg + newValue) / 2.0 : newValue;
}
}Function for bootstrapping keyed state with access to key context and timers.
public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
public abstract void processElement(IN value, Context ctx) throws Exception;
public abstract class Context {
public abstract TimerService timerService();
public abstract K getCurrentKey();
}
}Key Features:
ctx.getCurrentKey()Usage Example:
public class UserSessionBootstrapFunction extends KeyedStateBootstrapFunction<String, UserActivity> {
private ValueState<UserSession> sessionState;
private ListState<String> activityLogState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
sessionState = getRuntimeContext().getState(
new ValueStateDescriptor<>("session", UserSession.class)
);
activityLogState = getRuntimeContext().getListState(
new ListStateDescriptor<>("activityLog", String.class)
);
}
@Override
public void processElement(UserActivity activity, Context ctx) throws Exception {
String userId = ctx.getCurrentKey();
// Update or create user session
UserSession session = sessionState.value();
if (session == null) {
session = new UserSession(userId, activity.getTimestamp());
}
session.addActivity(activity);
sessionState.update(session);
// Add to activity log
activityLogState.add(activity.getAction() + ":" + activity.getTimestamp());
// Set session timeout timer
ctx.timerService().registerEventTimeTimer(
activity.getTimestamp() + Duration.ofMinutes(30).toMillis()
);
System.out.println("Updated session for user: " + userId);
}
}Function for bootstrapping broadcast state that is replicated across all operator instances.
public abstract class BroadcastStateBootstrapFunction<IN> extends AbstractRichFunction {
public abstract void processElement(IN value, Context ctx) throws Exception;
public interface Context {
long currentProcessingTime();
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> descriptor);
}
}Usage Example:
public class RulesBootstrapFunction extends BroadcastStateBootstrapFunction<BusinessRule> {
private MapStateDescriptor<String, BusinessRule> rulesDescriptor;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
rulesDescriptor = new MapStateDescriptor<>(
"business-rules", String.class, BusinessRule.class
);
}
@Override
public void processElement(BusinessRule rule, Context ctx) throws Exception {
BroadcastState<String, BusinessRule> broadcastState =
getRuntimeContext().getBroadcastState(rulesDescriptor);
broadcastState.put(rule.getId(), rule);
System.out.println("Broadcasted rule: " + rule.getId());
}
}Function for reading keyed state from existing savepoints.
public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
public abstract void open(Configuration parameters) throws Exception;
public abstract void readKey(K key, Context ctx, Collector<OUT> out) throws Exception;
public interface Context {
Set<Long> registeredEventTimeTimers() throws Exception;
Set<Long> registeredProcessingTimeTimers() throws Exception;
}
}Key Features:
open() methodUsage Example:
public class UserAnalyticsReaderFunction extends KeyedStateReaderFunction<String, UserInsight> {
private ValueState<UserProfile> profileState;
private ListState<PurchaseEvent> purchaseHistoryState;
private MapState<String, Double> categorySpendingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Must register ALL state descriptors here
profileState = getRuntimeContext().getState(
new ValueStateDescriptor<>("profile", UserProfile.class)
);
purchaseHistoryState = getRuntimeContext().getListState(
new ListStateDescriptor<>("purchases", PurchaseEvent.class)
);
categorySpendingState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("categorySpending", String.class, Double.class)
);
}
@Override
public void readKey(String userId, Context ctx, Collector<UserInsight> out) throws Exception {
UserProfile profile = profileState.value();
if (profile == null) {
return; // No data for this user
}
// Collect purchase history
List<PurchaseEvent> purchases = new ArrayList<>();
purchaseHistoryState.get().forEach(purchases::add);
// Collect category spending
Map<String, Double> categorySpending = new HashMap<>();
for (Map.Entry<String, Double> entry : categorySpendingState.entries()) {
categorySpending.put(entry.getKey(), entry.getValue());
}
// Access timer information
Set<Long> eventTimers = ctx.registeredEventTimeTimers();
Set<Long> processingTimers = ctx.registeredProcessingTimeTimers();
// Create and emit insight
UserInsight insight = new UserInsight(
userId,
profile,
purchases,
categorySpending,
eventTimers,
processingTimers
);
out.collect(insight);
}
}Function for reading window state with access to window metadata.
public abstract class WindowReaderFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {
public abstract void readWindow(
KEY key,
Context<W> context,
Iterable<IN> elements,
Collector<OUT> out
) throws Exception;
public interface Context<W extends Window> extends java.io.Serializable {
W window();
<S extends State> S triggerState(StateDescriptor<S, ?> descriptor);
KeyedStateStore windowState();
KeyedStateStore globalState();
Set<Long> registeredEventTimeTimers() throws Exception;
Set<Long> registeredProcessingTimeTimers() throws Exception;
}
}Usage Example:
public class SessionWindowReaderFunction implements WindowReaderFunction<UserEvent, SessionSummary, String, TimeWindow> {
@Override
public void readWindow(
String userId,
Context context,
Iterable<UserEvent> events,
Collector<SessionSummary> out
) throws Exception {
TimeWindow window = context.window();
List<UserEvent> eventList = new ArrayList<>();
events.forEach(eventList::add);
if (!eventList.isEmpty()) {
SessionSummary summary = new SessionSummary(
userId,
window.getStart(),
window.getEnd(),
eventList.size(),
eventList
);
out.collect(summary);
}
}
}Interface for assigning timestamps to elements during bootstrap.
@FunctionalInterface
public interface Timestamper<T> extends Function {
long timestamp(T element);
}Usage Example:
public class EventTimestamper implements Timestamper<Event> {
@Override
public long timestamp(Event event) {
return event.getEventTime();
}
}
// Use with bootstrap transformation
BootstrapTransformation<Event> transformation = OperatorTransformation
.bootstrapWith(events)
.keyBy(Event::getUserId)
.assignTimestamps(new EventTimestamper()) // Custom timestamper
.transform(new EventBootstrapFunction());public class ComplexStateBootstrapFunction extends KeyedStateBootstrapFunction<String, ComplexEvent> {
// Multiple state types
private ValueState<String> statusState;
private ListState<String> historyState;
private MapState<String, Integer> countersState;
private ReducingState<Double> sumState;
private AggregatingState<Double, Double, Double> avgState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Value state
statusState = getRuntimeContext().getState(
new ValueStateDescriptor<>("status", String.class)
);
// List state
historyState = getRuntimeContext().getListState(
new ListStateDescriptor<>("history", String.class)
);
// Map state
countersState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("counters", String.class, Integer.class)
);
// Reducing state
sumState = getRuntimeContext().getReducingState(
new ReducingStateDescriptor<>("sum", Double::sum, Double.class)
);
// Aggregating state
avgState = getRuntimeContext().getAggregatingState(
new AggregatingStateDescriptor<>("avg", new AverageAggregator(), Double.class)
);
}
@Override
public void processElement(ComplexEvent event, Context ctx) throws Exception {
// Update all state types
statusState.update(event.getStatus());
historyState.add(event.getAction());
String category = event.getCategory();
Integer count = countersState.get(category);
countersState.put(category, (count != null ? count : 0) + 1);
sumState.add(event.getValue());
avgState.add(event.getValue());
// Set timer if needed
if (event.needsCleanup()) {
ctx.timerService().registerEventTimeTimer(
event.getTimestamp() + Duration.ofHours(1).toMillis()
);
}
}
}public class ConditionalReaderFunction extends KeyedStateReaderFunction<String, FilteredResult> {
private ValueState<UserData> userDataState;
private ListState<Transaction> transactionState;
private final Predicate<UserData> userFilter;
private final Predicate<Transaction> transactionFilter;
public ConditionalReaderFunction(
Predicate<UserData> userFilter,
Predicate<Transaction> transactionFilter
) {
this.userFilter = userFilter;
this.transactionFilter = transactionFilter;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
userDataState = getRuntimeContext().getState(
new ValueStateDescriptor<>("userData", UserData.class)
);
transactionState = getRuntimeContext().getListState(
new ListStateDescriptor<>("transactions", Transaction.class)
);
}
@Override
public void readKey(String key, Context ctx, Collector<FilteredResult> out) throws Exception {
UserData userData = userDataState.value();
// Apply user filter
if (userData == null || !userFilter.test(userData)) {
return;
}
// Filter transactions
List<Transaction> filteredTransactions = new ArrayList<>();
for (Transaction transaction : transactionState.get()) {
if (transactionFilter.test(transaction)) {
filteredTransactions.add(transaction);
}
}
if (!filteredTransactions.isEmpty()) {
FilteredResult result = new FilteredResult(key, userData, filteredTransactions);
out.collect(result);
}
}
}public class RobustBootstrapFunction extends KeyedStateBootstrapFunction<String, DataEvent> {
private ValueState<String> dataState;
private static final Logger LOG = LoggerFactory.getLogger(RobustBootstrapFunction.class);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
dataState = getRuntimeContext().getState(
new ValueStateDescriptor<>("data", String.class)
);
} catch (Exception e) {
LOG.error("Failed to initialize state", e);
throw new RuntimeException("State initialization failed", e);
}
}
@Override
public void processElement(DataEvent event, Context ctx) throws Exception {
try {
// Validate input
if (event == null || event.getData() == null) {
LOG.warn("Received null event or data for key: {}", ctx.getCurrentKey());
return;
}
// Process event
String currentData = dataState.value();
String newData = combineData(currentData, event.getData());
dataState.update(newData);
} catch (Exception e) {
LOG.error("Failed to process event for key: {}", ctx.getCurrentKey(), e);
// Decide whether to re-throw or continue
// throw new RuntimeException("Processing failed", e);
}
}
private String combineData(String current, String newData) {
return current != null ? current + "," + newData : newData;
}
}public class SafeReaderFunction extends KeyedStateReaderFunction<String, SafeResult> {
private ValueState<String> dataState;
private static final Logger LOG = LoggerFactory.getLogger(SafeReaderFunction.class);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dataState = getRuntimeContext().getState(
new ValueStateDescriptor<>("data", String.class)
);
}
@Override
public void readKey(String key, Context ctx, Collector<SafeResult> out) throws Exception {
try {
String data = dataState.value();
if (data != null) {
SafeResult result = new SafeResult(key, data, true);
out.collect(result);
} else {
LOG.debug("No data found for key: {}", key);
// Optionally emit a result indicating missing data
SafeResult result = new SafeResult(key, null, false);
out.collect(result);
}
} catch (Exception e) {
LOG.error("Failed to read state for key: {}", key, e);
// Emit error result instead of failing
SafeResult errorResult = new SafeResult(key, "ERROR: " + e.getMessage(), false);
out.collect(errorResult);
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12