CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-state-processor-api-2-12

Apache Flink State Processor API for reading and writing savepoint state data offline

Pending
Overview
Eval results
Files

window-operations.mddocs/

Window Operations

Window operations provide specialized functionality for reading and writing window state data. The State Processor API supports both regular windows and evicting windows with different aggregation strategies.

Window Reader Overview

The WindowReader class provides entry points for reading window state from savepoints.

public class WindowReader<W extends Window> {
    public EvictingWindowReader<W> evictor();
    
    public <T, K> DataSource<T> reduce(
        String uid,
        ReduceFunction<T> function,
        TypeInformation<K> keyType,
        TypeInformation<T> reduceType
    ) throws IOException;
    
    public <K, T, OUT> DataSource<OUT> reduce(
        String uid,
        ReduceFunction<T> function,
        WindowReaderFunction<T, OUT, K, W> readerFunction,
        TypeInformation<K> keyType,
        TypeInformation<T> reduceType,
        TypeInformation<OUT> outputType
    ) throws IOException;
    
    public <K, T, ACC, R> DataSource<R> aggregate(
        String uid,
        AggregateFunction<T, ACC, R> aggregateFunction,
        TypeInformation<K> keyType,
        TypeInformation<ACC> accType,
        TypeInformation<R> outputType
    ) throws IOException;
    
    public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(
        String uid,
        AggregateFunction<T, ACC, R> aggregateFunction,
        WindowReaderFunction<R, OUT, K, W> readerFunction,
        TypeInformation<K> keyType,
        TypeInformation<ACC> accType,
        TypeInformation<OUT> outputType
    ) throws IOException;
    
    public <K, T, OUT> DataSource<OUT> process(
        String uid,
        WindowReaderFunction<T, OUT, K, W> readerFunction,
        TypeInformation<K> keyType,
        TypeInformation<T> stateType,
        TypeInformation<OUT> outputType
    ) throws IOException;
}

Creating Window Readers

Using Window Assigners

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindows;

// Create window reader with tumbling windows
WindowReader<TimeWindow> tumblingReader = savepoint.window(
    TumblingEventTimeWindows.of(Duration.ofMinutes(5))
);

// Create window reader with sliding windows
WindowReader<TimeWindow> slidingReader = savepoint.window(
    SlidingEventTimeWindows.of(Duration.ofMinutes(10), Duration.ofMinutes(2))
);

// Create window reader with session windows
WindowReader<TimeWindow> sessionReader = savepoint.window(
    SessionWindows.withGap(Duration.ofMinutes(30))
);

Using Window Serializers

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

// Create window reader with explicit serializer
TypeSerializer<TimeWindow> windowSerializer = new TimeWindow.Serializer();
WindowReader<TimeWindow> reader = savepoint.window(windowSerializer);

Reading Reduce Windows

Simple Reduce Reading

// Read window state created with ReduceFunction
DataSource<Integer> windowSums = reader.reduce(
    "sum-window-operator",
    new SumReduceFunction(),  // The original reduce function
    Types.STRING,             // Key type
    Types.INT                 // Value type
);

windowSums.print();

Reduce with Window Reader Function

public class WindowSummaryReader implements WindowReaderFunction<Integer, WindowSummary, String, TimeWindow> {
    
    @Override
    public void readWindow(
        String key,
        Context context,
        Iterable<Integer> elements,
        Collector<WindowSummary> out
    ) throws Exception {
        
        TimeWindow window = context.window();
        
        // Process reduced result (should be single element for reduce)
        Integer sum = elements.iterator().next();
        
        WindowSummary summary = new WindowSummary(
            key,
            window.getStart(),
            window.getEnd(),
            sum,
            1  // Reduce produces single value
        );
        
        out.collect(summary);
    }
}

// Use with reduce reader
DataSource<WindowSummary> summaries = reader.reduce(
    "sum-window-operator",
    new SumReduceFunction(),
    new WindowSummaryReader(),
    Types.STRING,      // Key type
    Types.INT,         // Reduce type
    TypeInformation.of(WindowSummary.class)  // Output type
);

Reading Aggregate Windows

Simple Aggregate Reading

public class AverageAggregateFunction implements AggregateFunction<Double, AverageAccumulator, Double> {
    
    @Override
    public AverageAccumulator createAccumulator() {
        return new AverageAccumulator();
    }
    
    @Override
    public AverageAccumulator add(Double value, AverageAccumulator accumulator) {
        accumulator.sum += value;
        accumulator.count++;
        return accumulator;
    }
    
    @Override
    public Double getResult(AverageAccumulator accumulator) {
        return accumulator.count == 0 ? 0.0 : accumulator.sum / accumulator.count;
    }
    
    @Override
    public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
        a.sum += b.sum;
        a.count += b.count;
        return a;
    }
}

// Read aggregated window results
DataSource<Double> windowAverages = reader.aggregate(
    "average-window-operator",
    new AverageAggregateFunction(),
    Types.STRING,  // Key type
    TypeInformation.of(AverageAccumulator.class),  // Accumulator type
    Types.DOUBLE   // Result type
);

Aggregate with Window Reader Function

public class AggregateWindowReader implements WindowReaderFunction<Double, AggregateResult, String, TimeWindow> {
    
    @Override
    public void readWindow(
        String key,
        Context context,
        Iterable<Double> elements,
        Collector<AggregateResult> out
    ) throws Exception {
        
        TimeWindow window = context.window();
        Double average = elements.iterator().next();  // Single aggregated result
        
        AggregateResult result = new AggregateResult(
            key,
            window.getStart(),
            window.getEnd(),
            average,
            "AVERAGE"
        );
        
        out.collect(result);
    }
}

// Use with aggregate reader
DataSource<AggregateResult> results = reader.aggregate(
    "average-window-operator",
    new AverageAggregateFunction(),
    new AggregateWindowReader(),
    Types.STRING,      // Key type
    TypeInformation.of(AverageAccumulator.class),  // Accumulator type
    TypeInformation.of(AggregateResult.class)      // Output type
);

Reading Process Windows

Process windows handle raw window contents without pre-aggregation.

public class ProcessWindowReader implements WindowReaderFunction<SensorReading, WindowAnalysis, String, TimeWindow> {
    
    @Override
    public void readWindow(
        String sensorId,
        Context context,
        Iterable<SensorReading> readings,
        Collector<WindowAnalysis> out
    ) throws Exception {
        
        TimeWindow window = context.window();
        List<SensorReading> readingList = new ArrayList<>();
        readings.forEach(readingList::add);
        
        if (!readingList.isEmpty()) {
            // Analyze raw window contents
            double min = readingList.stream().mapToDouble(SensorReading::getValue).min().orElse(0.0);
            double max = readingList.stream().mapToDouble(SensorReading::getValue).max().orElse(0.0);
            double avg = readingList.stream().mapToDouble(SensorReading::getValue).average().orElse(0.0);
            
            WindowAnalysis analysis = new WindowAnalysis(
                sensorId,
                window.getStart(),
                window.getEnd(),
                readingList.size(),
                min,
                max,
                avg,
                calculateTrend(readingList)
            );
            
            out.collect(analysis);
        }
    }
    
    private String calculateTrend(List<SensorReading> readings) {
        if (readings.size() < 2) return "STABLE";
        
        double first = readings.get(0).getValue();
        double last = readings.get(readings.size() - 1).getValue();
        
        if (last > first * 1.1) return "INCREASING";
        if (last < first * 0.9) return "DECREASING";
        return "STABLE";
    }
}

// Read process window state
DataSource<WindowAnalysis> analyses = reader.process(
    "sensor-window-operator",
    new ProcessWindowReader(),
    Types.STRING,  // Key type (sensor ID)
    TypeInformation.of(SensorReading.class),  // Element type
    TypeInformation.of(WindowAnalysis.class)  // Output type
);

Evicting Window Operations

Evicting windows use evictors to remove elements from windows before or after window processing.

public class EvictingWindowReader<W extends Window> {
    public <T, K> DataSource<T> reduce(
        String uid,
        ReduceFunction<T> function,
        TypeInformation<K> keyType,
        TypeInformation<T> reduceType
    ) throws IOException;
    
    public <K, T, OUT> DataSource<OUT> reduce(
        String uid,
        ReduceFunction<T> function,
        WindowReaderFunction<Iterable<T>, OUT, K, W> readerFunction,
        TypeInformation<K> keyType,
        TypeInformation<T> reduceType,
        TypeInformation<OUT> outputType
    ) throws IOException;
    
    public <K, T, ACC, R> DataSource<R> aggregate(
        String uid,
        AggregateFunction<T, ACC, R> aggregateFunction,
        TypeInformation<K> keyType,
        TypeInformation<ACC> accType,
        TypeInformation<R> outputType
    ) throws IOException;
    
    public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(
        String uid,
        AggregateFunction<T, ACC, R> aggregateFunction,
        WindowReaderFunction<Iterable<R>, OUT, K, W> readerFunction,
        TypeInformation<K> keyType,
        TypeInformation<ACC> accType,
        TypeInformation<OUT> outputType
    ) throws IOException;
    
    public <K, T, OUT> DataSource<OUT> process(
        String uid,
        WindowReaderFunction<Iterable<T>, OUT, K, W> readerFunction,
        TypeInformation<K> keyType,
        TypeInformation<T> stateType,
        TypeInformation<OUT> outputType
    ) throws IOException;
}

Reading Evicting Windows

// Get evicting window reader
EvictingWindowReader<TimeWindow> evictingReader = reader.evictor();

// Read evicting window with process function
public class EvictingProcessReader implements WindowReaderFunction<Iterable<Event>, EventSummary, String, TimeWindow> {
    
    @Override
    public void readWindow(
        String key,
        Context context,
        Iterable<Iterable<Event>> elements,  // Note: Iterable of Iterable for evicting windows
        Collector<EventSummary> out
    ) throws Exception {
        
        TimeWindow window = context.window();
        List<Event> allEvents = new ArrayList<>();
        
        // Flatten the nested iterables
        for (Iterable<Event> eventGroup : elements) {
            eventGroup.forEach(allEvents::add);
        }
        
        if (!allEvents.isEmpty()) {
            EventSummary summary = new EventSummary(
                key,
                window.getStart(),
                window.getEnd(),
                allEvents.size(),
                allEvents
            );
            
            out.collect(summary);
        }
    }
}

DataSource<EventSummary> evictingSummaries = evictingReader.process(
    "evicting-window-operator",
    new EvictingProcessReader(),
    Types.STRING,  // Key type
    TypeInformation.of(Event.class),  // Element type
    TypeInformation.of(EventSummary.class)  // Output type
);

Window Bootstrap Operations

While the main focus is on reading windows, you can also bootstrap window state.

Creating Window Bootstrap Transformation

public class WindowBootstrapFunction extends KeyedStateBootstrapFunction<String, TimestampedEvent> {
    private WindowState<TimeWindow> windowState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Register window state
        WindowStateDescriptor<TimeWindow> windowDesc = new WindowStateDescriptor<>(
            "window", new TimeWindow.Serializer()
        );
        windowState = getRuntimeContext().getWindowState(windowDesc);
    }
    
    @Override
    public void processElement(TimestampedEvent event, Context ctx) throws Exception {
        // Assign to window based on timestamp
        long timestamp = event.getTimestamp();
        long windowStart = timestamp - (timestamp % Duration.ofMinutes(5).toMillis());
        long windowEnd = windowStart + Duration.ofMinutes(5).toMillis();
        
        TimeWindow window = new TimeWindow(windowStart, windowEnd);
        
        // Add event to window
        windowState.add(window, event);
        
        // Set timer for window end
        ctx.timerService().registerEventTimeTimer(windowEnd);
    }
}

Complex Window Analysis

Multi-Window Analysis

public class MultiWindowAnalyzer implements WindowReaderFunction<MetricEvent, MultiWindowResult, String, TimeWindow> {
    
    @Override
    public void readWindow(
        String metricName,
        Context context,
        Iterable<MetricEvent> events,
        Collector<MultiWindowResult> out
    ) throws Exception {
        
        TimeWindow window = context.window();
        List<MetricEvent> eventList = new ArrayList<>();
        events.forEach(eventList::add);
        
        if (eventList.isEmpty()) return;
        
        // Calculate multiple statistics
        DoubleSummaryStatistics stats = eventList.stream()
            .mapToDouble(MetricEvent::getValue)
            .summaryStatistics();
        
        // Calculate percentiles
        List<Double> sortedValues = eventList.stream()
            .mapToDouble(MetricEvent::getValue)
            .sorted()
            .boxed()
            .collect(Collectors.toList());
        
        double p50 = calculatePercentile(sortedValues, 0.5);
        double p95 = calculatePercentile(sortedValues, 0.95);
        double p99 = calculatePercentile(sortedValues, 0.99);
        
        // Detect anomalies
        List<MetricEvent> anomalies = detectAnomalies(eventList, stats.getAverage(), Math.sqrt(stats.getAverage()));
        
        MultiWindowResult result = new MultiWindowResult(
            metricName,
            window.getStart(),
            window.getEnd(),
            eventList.size(),
            stats.getMin(),
            stats.getMax(),
            stats.getAverage(),
            p50, p95, p99,
            anomalies
        );
        
        out.collect(result);
    }
    
    private double calculatePercentile(List<Double> sortedValues, double percentile) {
        int index = (int) Math.ceil(percentile * sortedValues.size()) - 1;
        return sortedValues.get(Math.max(0, Math.min(index, sortedValues.size() - 1)));
    }
    
    private List<MetricEvent> detectAnomalies(List<MetricEvent> events, double mean, double stdDev) {
        double threshold = 2.0 * stdDev;
        return events.stream()
            .filter(event -> Math.abs(event.getValue() - mean) > threshold)
            .collect(Collectors.toList());
    }
}

Error Handling in Window Operations

Robust Window Reading

public class RobustWindowReader implements WindowReaderFunction<DataPoint, WindowResult, String, TimeWindow> {
    private static final Logger LOG = LoggerFactory.getLogger(RobustWindowReader.class);
    
    @Override
    public void readWindow(
        String key,
        Context context,
        Iterable<DataPoint> elements,
        Collector<WindowResult> out
    ) throws Exception {
        
        try {
            TimeWindow window = context.window();
            List<DataPoint> points = new ArrayList<>();
            
            // Safely iterate over elements
            for (DataPoint point : elements) {
                if (point != null && isValidDataPoint(point)) {
                    points.add(point);
                } else {
                    LOG.warn("Invalid data point in window for key: {}", key);
                }
            }
            
            if (!points.isEmpty()) {
                WindowResult result = processDataPoints(key, window, points);
                out.collect(result);
            } else {
                LOG.debug("No valid data points in window for key: {}", key);
            }
            
        } catch (Exception e) {
            LOG.error("Error processing window for key: {}", key, e);
            // Could emit error result instead of failing
            // out.collect(new WindowResult(key, context.window(), "ERROR", e.getMessage()));
            throw e; // Re-throw to fail the job
        }
    }
    
    private boolean isValidDataPoint(DataPoint point) {
        return point.getValue() >= 0 && point.getTimestamp() > 0;
    }
    
    private WindowResult processDataPoints(String key, TimeWindow window, List<DataPoint> points) {
        // Safe processing logic
        double average = points.stream().mapToDouble(DataPoint::getValue).average().orElse(0.0);
        return new WindowResult(key, window, "SUCCESS", average);
    }
}

Window Type Safety

// Ensure proper type handling for different window types
public class TypeSafeWindowReader<W extends Window> implements WindowReaderFunction<TypedEvent, TypedResult, String, W> {
    private final Class<W> windowClass;
    
    public TypeSafeWindowReader(Class<W> windowClass) {
        this.windowClass = windowClass;
    }
    
    @Override
    public void readWindow(
        String key,
        Context context,
        Iterable<TypedEvent> elements,
        Collector<TypedResult> out
    ) throws Exception {
        
        W window = context.window();
        
        // Type-safe window handling
        if (windowClass.isInstance(window)) {
            TypedResult result = processTypedWindow(key, windowClass.cast(window), elements);
            out.collect(result);
        } else {
            throw new IllegalArgumentException("Unexpected window type: " + window.getClass());
        }
    }
    
    private TypedResult processTypedWindow(String key, W window, Iterable<TypedEvent> elements) {
        // Type-specific processing
        return new TypedResult(key, window.toString(), elements);
    }
}

Performance Optimization

Efficient Window Processing

public class OptimizedWindowReader implements WindowReaderFunction<LargeEvent, CompactResult, String, TimeWindow> {
    
    @Override
    public void readWindow(
        String key,
        Context context,
        Iterable<LargeEvent> elements,
        Collector<CompactResult> out
    ) throws Exception {
        
        TimeWindow window = context.window();
        
        // Process in streaming fashion to avoid loading all into memory
        double sum = 0.0;
        int count = 0;
        double min = Double.MAX_VALUE;
        double max = Double.MIN_VALUE;
        
        for (LargeEvent event : elements) {
            double value = event.getValue();
            sum += value;
            count++;
            min = Math.min(min, value);
            max = Math.max(max, value);
        }
        
        if (count > 0) {
            CompactResult result = new CompactResult(
                key,
                window.getStart(),
                window.getEnd(),
                count,
                sum / count,
                min,
                max
            );
            
            out.collect(result);
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12

docs

function-interfaces.md

index.md

savepoint-management.md

state-reading.md

state-writing.md

window-operations.md

tile.json