Apache Flink State Processor API for reading and writing savepoint state data offline
—
Window operations provide specialized functionality for reading and writing window state data. The State Processor API supports both regular windows and evicting windows with different aggregation strategies.
The WindowReader class provides entry points for reading window state from savepoints.
public class WindowReader<W extends Window> {
public EvictingWindowReader<W> evictor();
public <T, K> DataSource<T> reduce(
String uid,
ReduceFunction<T> function,
TypeInformation<K> keyType,
TypeInformation<T> reduceType
) throws IOException;
public <K, T, OUT> DataSource<OUT> reduce(
String uid,
ReduceFunction<T> function,
WindowReaderFunction<T, OUT, K, W> readerFunction,
TypeInformation<K> keyType,
TypeInformation<T> reduceType,
TypeInformation<OUT> outputType
) throws IOException;
public <K, T, ACC, R> DataSource<R> aggregate(
String uid,
AggregateFunction<T, ACC, R> aggregateFunction,
TypeInformation<K> keyType,
TypeInformation<ACC> accType,
TypeInformation<R> outputType
) throws IOException;
public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(
String uid,
AggregateFunction<T, ACC, R> aggregateFunction,
WindowReaderFunction<R, OUT, K, W> readerFunction,
TypeInformation<K> keyType,
TypeInformation<ACC> accType,
TypeInformation<OUT> outputType
) throws IOException;
public <K, T, OUT> DataSource<OUT> process(
String uid,
WindowReaderFunction<T, OUT, K, W> readerFunction,
TypeInformation<K> keyType,
TypeInformation<T> stateType,
TypeInformation<OUT> outputType
) throws IOException;
}import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindows;
// Create window reader with tumbling windows
WindowReader<TimeWindow> tumblingReader = savepoint.window(
TumblingEventTimeWindows.of(Duration.ofMinutes(5))
);
// Create window reader with sliding windows
WindowReader<TimeWindow> slidingReader = savepoint.window(
SlidingEventTimeWindows.of(Duration.ofMinutes(10), Duration.ofMinutes(2))
);
// Create window reader with session windows
WindowReader<TimeWindow> sessionReader = savepoint.window(
SessionWindows.withGap(Duration.ofMinutes(30))
);import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
// Create window reader with explicit serializer
TypeSerializer<TimeWindow> windowSerializer = new TimeWindow.Serializer();
WindowReader<TimeWindow> reader = savepoint.window(windowSerializer);// Read window state created with ReduceFunction
DataSource<Integer> windowSums = reader.reduce(
"sum-window-operator",
new SumReduceFunction(), // The original reduce function
Types.STRING, // Key type
Types.INT // Value type
);
windowSums.print();public class WindowSummaryReader implements WindowReaderFunction<Integer, WindowSummary, String, TimeWindow> {
@Override
public void readWindow(
String key,
Context context,
Iterable<Integer> elements,
Collector<WindowSummary> out
) throws Exception {
TimeWindow window = context.window();
// Process reduced result (should be single element for reduce)
Integer sum = elements.iterator().next();
WindowSummary summary = new WindowSummary(
key,
window.getStart(),
window.getEnd(),
sum,
1 // Reduce produces single value
);
out.collect(summary);
}
}
// Use with reduce reader
DataSource<WindowSummary> summaries = reader.reduce(
"sum-window-operator",
new SumReduceFunction(),
new WindowSummaryReader(),
Types.STRING, // Key type
Types.INT, // Reduce type
TypeInformation.of(WindowSummary.class) // Output type
);public class AverageAggregateFunction implements AggregateFunction<Double, AverageAccumulator, Double> {
@Override
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
@Override
public AverageAccumulator add(Double value, AverageAccumulator accumulator) {
accumulator.sum += value;
accumulator.count++;
return accumulator;
}
@Override
public Double getResult(AverageAccumulator accumulator) {
return accumulator.count == 0 ? 0.0 : accumulator.sum / accumulator.count;
}
@Override
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
a.sum += b.sum;
a.count += b.count;
return a;
}
}
// Read aggregated window results
DataSource<Double> windowAverages = reader.aggregate(
"average-window-operator",
new AverageAggregateFunction(),
Types.STRING, // Key type
TypeInformation.of(AverageAccumulator.class), // Accumulator type
Types.DOUBLE // Result type
);public class AggregateWindowReader implements WindowReaderFunction<Double, AggregateResult, String, TimeWindow> {
@Override
public void readWindow(
String key,
Context context,
Iterable<Double> elements,
Collector<AggregateResult> out
) throws Exception {
TimeWindow window = context.window();
Double average = elements.iterator().next(); // Single aggregated result
AggregateResult result = new AggregateResult(
key,
window.getStart(),
window.getEnd(),
average,
"AVERAGE"
);
out.collect(result);
}
}
// Use with aggregate reader
DataSource<AggregateResult> results = reader.aggregate(
"average-window-operator",
new AverageAggregateFunction(),
new AggregateWindowReader(),
Types.STRING, // Key type
TypeInformation.of(AverageAccumulator.class), // Accumulator type
TypeInformation.of(AggregateResult.class) // Output type
);Process windows handle raw window contents without pre-aggregation.
public class ProcessWindowReader implements WindowReaderFunction<SensorReading, WindowAnalysis, String, TimeWindow> {
@Override
public void readWindow(
String sensorId,
Context context,
Iterable<SensorReading> readings,
Collector<WindowAnalysis> out
) throws Exception {
TimeWindow window = context.window();
List<SensorReading> readingList = new ArrayList<>();
readings.forEach(readingList::add);
if (!readingList.isEmpty()) {
// Analyze raw window contents
double min = readingList.stream().mapToDouble(SensorReading::getValue).min().orElse(0.0);
double max = readingList.stream().mapToDouble(SensorReading::getValue).max().orElse(0.0);
double avg = readingList.stream().mapToDouble(SensorReading::getValue).average().orElse(0.0);
WindowAnalysis analysis = new WindowAnalysis(
sensorId,
window.getStart(),
window.getEnd(),
readingList.size(),
min,
max,
avg,
calculateTrend(readingList)
);
out.collect(analysis);
}
}
private String calculateTrend(List<SensorReading> readings) {
if (readings.size() < 2) return "STABLE";
double first = readings.get(0).getValue();
double last = readings.get(readings.size() - 1).getValue();
if (last > first * 1.1) return "INCREASING";
if (last < first * 0.9) return "DECREASING";
return "STABLE";
}
}
// Read process window state
DataSource<WindowAnalysis> analyses = reader.process(
"sensor-window-operator",
new ProcessWindowReader(),
Types.STRING, // Key type (sensor ID)
TypeInformation.of(SensorReading.class), // Element type
TypeInformation.of(WindowAnalysis.class) // Output type
);Evicting windows use evictors to remove elements from windows before or after window processing.
public class EvictingWindowReader<W extends Window> {
public <T, K> DataSource<T> reduce(
String uid,
ReduceFunction<T> function,
TypeInformation<K> keyType,
TypeInformation<T> reduceType
) throws IOException;
public <K, T, OUT> DataSource<OUT> reduce(
String uid,
ReduceFunction<T> function,
WindowReaderFunction<Iterable<T>, OUT, K, W> readerFunction,
TypeInformation<K> keyType,
TypeInformation<T> reduceType,
TypeInformation<OUT> outputType
) throws IOException;
public <K, T, ACC, R> DataSource<R> aggregate(
String uid,
AggregateFunction<T, ACC, R> aggregateFunction,
TypeInformation<K> keyType,
TypeInformation<ACC> accType,
TypeInformation<R> outputType
) throws IOException;
public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(
String uid,
AggregateFunction<T, ACC, R> aggregateFunction,
WindowReaderFunction<Iterable<R>, OUT, K, W> readerFunction,
TypeInformation<K> keyType,
TypeInformation<ACC> accType,
TypeInformation<OUT> outputType
) throws IOException;
public <K, T, OUT> DataSource<OUT> process(
String uid,
WindowReaderFunction<Iterable<T>, OUT, K, W> readerFunction,
TypeInformation<K> keyType,
TypeInformation<T> stateType,
TypeInformation<OUT> outputType
) throws IOException;
}// Get evicting window reader
EvictingWindowReader<TimeWindow> evictingReader = reader.evictor();
// Read evicting window with process function
public class EvictingProcessReader implements WindowReaderFunction<Iterable<Event>, EventSummary, String, TimeWindow> {
@Override
public void readWindow(
String key,
Context context,
Iterable<Iterable<Event>> elements, // Note: Iterable of Iterable for evicting windows
Collector<EventSummary> out
) throws Exception {
TimeWindow window = context.window();
List<Event> allEvents = new ArrayList<>();
// Flatten the nested iterables
for (Iterable<Event> eventGroup : elements) {
eventGroup.forEach(allEvents::add);
}
if (!allEvents.isEmpty()) {
EventSummary summary = new EventSummary(
key,
window.getStart(),
window.getEnd(),
allEvents.size(),
allEvents
);
out.collect(summary);
}
}
}
DataSource<EventSummary> evictingSummaries = evictingReader.process(
"evicting-window-operator",
new EvictingProcessReader(),
Types.STRING, // Key type
TypeInformation.of(Event.class), // Element type
TypeInformation.of(EventSummary.class) // Output type
);While the main focus is on reading windows, you can also bootstrap window state.
public class WindowBootstrapFunction extends KeyedStateBootstrapFunction<String, TimestampedEvent> {
private WindowState<TimeWindow> windowState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Register window state
WindowStateDescriptor<TimeWindow> windowDesc = new WindowStateDescriptor<>(
"window", new TimeWindow.Serializer()
);
windowState = getRuntimeContext().getWindowState(windowDesc);
}
@Override
public void processElement(TimestampedEvent event, Context ctx) throws Exception {
// Assign to window based on timestamp
long timestamp = event.getTimestamp();
long windowStart = timestamp - (timestamp % Duration.ofMinutes(5).toMillis());
long windowEnd = windowStart + Duration.ofMinutes(5).toMillis();
TimeWindow window = new TimeWindow(windowStart, windowEnd);
// Add event to window
windowState.add(window, event);
// Set timer for window end
ctx.timerService().registerEventTimeTimer(windowEnd);
}
}public class MultiWindowAnalyzer implements WindowReaderFunction<MetricEvent, MultiWindowResult, String, TimeWindow> {
@Override
public void readWindow(
String metricName,
Context context,
Iterable<MetricEvent> events,
Collector<MultiWindowResult> out
) throws Exception {
TimeWindow window = context.window();
List<MetricEvent> eventList = new ArrayList<>();
events.forEach(eventList::add);
if (eventList.isEmpty()) return;
// Calculate multiple statistics
DoubleSummaryStatistics stats = eventList.stream()
.mapToDouble(MetricEvent::getValue)
.summaryStatistics();
// Calculate percentiles
List<Double> sortedValues = eventList.stream()
.mapToDouble(MetricEvent::getValue)
.sorted()
.boxed()
.collect(Collectors.toList());
double p50 = calculatePercentile(sortedValues, 0.5);
double p95 = calculatePercentile(sortedValues, 0.95);
double p99 = calculatePercentile(sortedValues, 0.99);
// Detect anomalies
List<MetricEvent> anomalies = detectAnomalies(eventList, stats.getAverage(), Math.sqrt(stats.getAverage()));
MultiWindowResult result = new MultiWindowResult(
metricName,
window.getStart(),
window.getEnd(),
eventList.size(),
stats.getMin(),
stats.getMax(),
stats.getAverage(),
p50, p95, p99,
anomalies
);
out.collect(result);
}
private double calculatePercentile(List<Double> sortedValues, double percentile) {
int index = (int) Math.ceil(percentile * sortedValues.size()) - 1;
return sortedValues.get(Math.max(0, Math.min(index, sortedValues.size() - 1)));
}
private List<MetricEvent> detectAnomalies(List<MetricEvent> events, double mean, double stdDev) {
double threshold = 2.0 * stdDev;
return events.stream()
.filter(event -> Math.abs(event.getValue() - mean) > threshold)
.collect(Collectors.toList());
}
}public class RobustWindowReader implements WindowReaderFunction<DataPoint, WindowResult, String, TimeWindow> {
private static final Logger LOG = LoggerFactory.getLogger(RobustWindowReader.class);
@Override
public void readWindow(
String key,
Context context,
Iterable<DataPoint> elements,
Collector<WindowResult> out
) throws Exception {
try {
TimeWindow window = context.window();
List<DataPoint> points = new ArrayList<>();
// Safely iterate over elements
for (DataPoint point : elements) {
if (point != null && isValidDataPoint(point)) {
points.add(point);
} else {
LOG.warn("Invalid data point in window for key: {}", key);
}
}
if (!points.isEmpty()) {
WindowResult result = processDataPoints(key, window, points);
out.collect(result);
} else {
LOG.debug("No valid data points in window for key: {}", key);
}
} catch (Exception e) {
LOG.error("Error processing window for key: {}", key, e);
// Could emit error result instead of failing
// out.collect(new WindowResult(key, context.window(), "ERROR", e.getMessage()));
throw e; // Re-throw to fail the job
}
}
private boolean isValidDataPoint(DataPoint point) {
return point.getValue() >= 0 && point.getTimestamp() > 0;
}
private WindowResult processDataPoints(String key, TimeWindow window, List<DataPoint> points) {
// Safe processing logic
double average = points.stream().mapToDouble(DataPoint::getValue).average().orElse(0.0);
return new WindowResult(key, window, "SUCCESS", average);
}
}// Ensure proper type handling for different window types
public class TypeSafeWindowReader<W extends Window> implements WindowReaderFunction<TypedEvent, TypedResult, String, W> {
private final Class<W> windowClass;
public TypeSafeWindowReader(Class<W> windowClass) {
this.windowClass = windowClass;
}
@Override
public void readWindow(
String key,
Context context,
Iterable<TypedEvent> elements,
Collector<TypedResult> out
) throws Exception {
W window = context.window();
// Type-safe window handling
if (windowClass.isInstance(window)) {
TypedResult result = processTypedWindow(key, windowClass.cast(window), elements);
out.collect(result);
} else {
throw new IllegalArgumentException("Unexpected window type: " + window.getClass());
}
}
private TypedResult processTypedWindow(String key, W window, Iterable<TypedEvent> elements) {
// Type-specific processing
return new TypedResult(key, window.toString(), elements);
}
}public class OptimizedWindowReader implements WindowReaderFunction<LargeEvent, CompactResult, String, TimeWindow> {
@Override
public void readWindow(
String key,
Context context,
Iterable<LargeEvent> elements,
Collector<CompactResult> out
) throws Exception {
TimeWindow window = context.window();
// Process in streaming fashion to avoid loading all into memory
double sum = 0.0;
int count = 0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
for (LargeEvent event : elements) {
double value = event.getValue();
sum += value;
count++;
min = Math.min(min, value);
max = Math.max(max, value);
}
if (count > 0) {
CompactResult result = new CompactResult(
key,
window.getStart(),
window.getEnd(),
count,
sum / count,
min,
max
);
out.collect(result);
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12