CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-java-2-11

Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities

Pending
Overview
Eval results
Files

windowing.mddocs/

Windowing Operations

Windowing in Apache Flink groups elements of unbounded streams into finite sets (windows) for batch-like processing. Windows can be based on time (event time or processing time) or element count, and can be tumbling (non-overlapping), sliding (overlapping), or session-based.

Capabilities

Window Types

Create different types of windows based on time or count criteria.

// Time-based tumbling windows (non-overlapping)
WindowedStream<T, K, TimeWindow> timeWindow(Time size);

// Time-based sliding windows (overlapping)
WindowedStream<T, K, TimeWindow> timeWindow(Time size, Time slide);

// Count-based tumbling windows
WindowedStream<T, K, GlobalWindow> countWindow(long size);

// Count-based sliding windows
WindowedStream<T, K, GlobalWindow> countWindow(long size, long slide);

// Custom window assigner
<W extends Window> WindowedStream<T, K, W> window(WindowAssigner<? super T, W> assigner);

Usage Examples:

DataStream<Tuple2<String, Integer>> input = env.fromElements(
    Tuple2.of("a", 1), Tuple2.of("b", 2), Tuple2.of("a", 3)
);

KeyedStream<Tuple2<String, Integer>, String> keyed = input.keyBy(t -> t.f0);

// Tumbling time window - 5 minute non-overlapping windows
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> tumblingTime = 
    keyed.timeWindow(Time.minutes(5));

// Sliding time window - 10 minute windows, advancing every 2 minutes
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> slidingTime = 
    keyed.timeWindow(Time.minutes(10), Time.minutes(2));

// Tumbling count window - every 100 elements
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> tumblingCount = 
    keyed.countWindow(100);

// Sliding count window - 1000 elements, sliding by 100
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> slidingCount = 
    keyed.countWindow(1000, 100);

// Session window - group by activity sessions with 30 minute timeout
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> sessionWindow = 
    keyed.window(EventTimeSessionWindows.withGap(Time.minutes(30)));

// Global window - single window for all elements
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> globalWindow = 
    keyed.window(GlobalWindows.create());

Window Assigners

Built-in window assigners for common windowing patterns.

// Tumbling event time windows
class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    static TumblingEventTimeWindows of(Time size);
    static TumblingEventTimeWindows of(Time size, Time offset);
}

// Sliding event time windows
class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    static SlidingEventTimeWindows of(Time size, Time slide);
    static SlidingEventTimeWindows of(Time size, Time slide, Time offset);
}

// Tumbling processing time windows
class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    static TumblingProcessingTimeWindows of(Time size);
    static TumblingProcessingTimeWindows of(Time size, Time offset);
}

// Sliding processing time windows
class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    static SlidingProcessingTimeWindows of(Time size, Time slide);
    static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset);
}

// Session windows based on event time
class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    static EventTimeSessionWindows withGap(Time sessionTimeout);
    static EventTimeSessionWindows withDynamicGap(SessionWindowTimeGapExtractor<Object> sessionTimeGapExtractor);
}

// Session windows based on processing time
class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    static ProcessingTimeSessionWindows withGap(Time sessionTimeout);
    static ProcessingTimeSessionWindows withDynamicGap(SessionWindowTimeGapExtractor<Object> sessionTimeGapExtractor);
}

// Global window assigner
class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
    static GlobalWindows create();
}

Usage Examples:

KeyedStream<Event, String> keyedEvents = events.keyBy(Event::getUserId);

// Tumbling event time windows with offset (e.g., hourly windows starting at :15)
WindowedStream<Event, String, TimeWindow> hourlyWindows = keyedEvents.window(
    TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))
);

// Sliding processing time windows
WindowedStream<Event, String, TimeWindow> slidingProcessingTime = keyedEvents.window(
    SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(2))
);

// Session windows with dynamic gap
WindowedStream<Event, String, TimeWindow> sessionWindows = keyedEvents.window(
    EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Event>() {
        @Override
        public long extract(Event element) {
            return element.getSessionTimeout(); // Custom session timeout per element
        }
    })
);

Window Operations

Apply different types of operations to windowed streams.

/**
 * Reduce window contents using a ReduceFunction
 * @param function - reduce function to combine elements
 * @return reduced DataStream
 */
DataStream<T> reduce(ReduceFunction<T> function);

/**
 * Aggregate window contents using an AggregateFunction
 * @param aggFunction - aggregate function for incremental aggregation
 * @return aggregated DataStream
 */
<ACC, R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> aggFunction);

/**
 * Apply a WindowFunction to window contents
 * @param function - window function to apply
 * @return transformed DataStream
 */
<R> DataStream<R> apply(WindowFunction<T, R, K, W> function);

/**
 * Apply a ProcessWindowFunction to window contents
 * @param function - process window function with access to window metadata
 * @return processed DataStream
 */
<R> DataStream<R> process(ProcessWindowFunction<T, R, K, W> function);

/**
 * Combine reduce with window function for efficient processing
 * @param reduceFunction - reduce function for pre-aggregation
 * @param windowFunction - window function for final processing
 * @return processed DataStream
 */
<R> DataStream<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> windowFunction);

/**
 * Combine aggregate with process window function
 * @param aggFunction - aggregate function for pre-aggregation
 * @param windowFunction - process window function for final processing
 * @return processed DataStream
 */
<ACC, R> DataStream<R> aggregate(
    AggregateFunction<T, ACC, R> aggFunction,
    ProcessWindowFunction<R, R, K, W> windowFunction
);

Usage Examples:

WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = 
    keyed.timeWindow(Time.minutes(5));

// Reduce - sum all values in window
DataStream<Tuple2<String, Integer>> reduced = windowedStream.reduce(
    (value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1)
);

// Aggregate - calculate average
DataStream<Tuple2<String, Double>> avgResult = windowedStream.aggregate(
    new AggregateFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, Tuple2<String, Double>>() {
        @Override
        public Tuple3<String, Integer, Integer> createAccumulator() {
            return Tuple3.of("", 0, 0);
        }

        @Override
        public Tuple3<String, Integer, Integer> add(
            Tuple2<String, Integer> value,
            Tuple3<String, Integer, Integer> accumulator
        ) {
            return Tuple3.of(value.f0, accumulator.f1 + value.f1, accumulator.f2 + 1);
        }

        @Override
        public Tuple2<String, Double> getResult(Tuple3<String, Integer, Integer> accumulator) {
            return Tuple2.of(accumulator.f0, (double) accumulator.f1 / accumulator.f2);
        }

        @Override
        public Tuple3<String, Integer, Integer> merge(
            Tuple3<String, Integer, Integer> a,
            Tuple3<String, Integer, Integer> b
        ) {
            return Tuple3.of(a.f0, a.f1 + b.f1, a.f2 + b.f2);
        }
    }
);

// Apply window function with access to window metadata
DataStream<String> windowInfo = windowedStream.apply(
    new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
        @Override
        public void apply(
            String key,
            TimeWindow window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<String> out
        ) {
            int count = 0;
            int sum = 0;
            for (Tuple2<String, Integer> value : values) {
                count++;
                sum += value.f1;
            }
            out.collect(String.format("Key: %s, Window: [%d-%d], Count: %d, Sum: %d",
                key, window.getStart(), window.getEnd(), count, sum));
        }
    }
);

// Process window function with rich context
DataStream<WindowResult> processResult = windowedStream.process(
    new ProcessWindowFunction<Tuple2<String, Integer>, WindowResult, String, TimeWindow>() {
        @Override
        public void process(
            String key,
            Context context,
            Iterable<Tuple2<String, Integer>> elements,
            Collector<WindowResult> out
        ) {
            int count = 0;
            int sum = 0;
            for (Tuple2<String, Integer> element : elements) {
                count++;
                sum += element.f1;
            }
            
            WindowResult result = new WindowResult(
                key,
                context.window().getStart(),
                context.window().getEnd(),
                count,
                sum,
                context.currentWatermark()
            );
            out.collect(result);
        }
    }
);

Window Configuration

Configure window behavior with triggers, evictors, and late data handling.

/**
 * Set a custom trigger for the window
 * @param trigger - trigger to determine when window should fire
 * @return configured WindowedStream
 */
WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger);

/**
 * Set a custom evictor for the window
 * @param evictor - evictor to remove elements from window
 * @return configured WindowedStream
 */
WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor);

/**
 * Set allowed lateness for late arriving elements
 * @param lateness - maximum allowed lateness
 * @return configured WindowedStream
 */
WindowedStream<T, K, W> allowedLateness(Time lateness);

/**
 * Configure side output for late data
 * @param outputTag - output tag for late elements
 * @return configured WindowedStream
 */
WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag);

Usage Examples:

OutputTag<Tuple2<String, Integer>> lateDataTag = 
    new OutputTag<Tuple2<String, Integer>>("late-data"){};

WindowedStream<Tuple2<String, Integer>, String, TimeWindow> configuredWindow = 
    keyed.timeWindow(Time.minutes(5))
        .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30))) // Fire every 30 seconds
        .allowedLateness(Time.minutes(2)) // Allow 2 minutes of lateness
        .sideOutputLateData(lateDataTag); // Send late data to side output

DataStream<Tuple2<String, Integer>> result = configuredWindow.reduce(
    (v1, v2) -> Tuple2.of(v1.f0, v1.f1 + v2.f1)
);

// Get late data from side output
DataStream<Tuple2<String, Integer>> lateData = result.getSideOutput(lateDataTag);

Built-in Triggers

Pre-defined triggers for common window firing patterns.

// Event time trigger - fires when watermark passes window end
class EventTimeTrigger implements Trigger<Object, TimeWindow> {
    static EventTimeTrigger create();
}

// Processing time trigger - fires when processing time passes window end
class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
    static ProcessingTimeTrigger create();
}

// Count trigger - fires when element count reaches threshold
class CountTrigger<W extends Window> implements Trigger<Object, W> {
    static <W extends Window> CountTrigger<W> of(long maxCount);
}

// Continuous event time trigger - fires repeatedly based on interval
class ContinuousEventTimeTrigger implements Trigger<Object, TimeWindow> {
    static ContinuousEventTimeTrigger of(Time interval);
}

// Continuous processing time trigger - fires repeatedly based on interval
class ContinuousProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
    static ContinuousProcessingTimeTrigger of(Time interval);
}

// Delta trigger - fires when delta between elements exceeds threshold
class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
    static <T, W extends Window> DeltaTrigger<T, W> of(
        double threshold,
        DeltaFunction<T> deltaFunction,
        TypeSerializer<T> stateSerializer
    );
}

// Purging trigger - wraps another trigger and purges window contents after firing
class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
    static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger);
}

Usage Examples:

// Count-based trigger - fire every 100 elements
WindowedStream<Event, String, TimeWindow> countTriggered = keyedEvents
    .timeWindow(Time.hours(1))
    .trigger(CountTrigger.of(100));

// Continuous firing every 30 seconds during window
WindowedStream<Event, String, TimeWindow> continuousTriggered = keyedEvents
    .timeWindow(Time.minutes(10))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)));

// Delta trigger - fire when value changes by more than threshold
WindowedStream<Sensor, String, TimeWindow> deltaTriggered = sensorStream
    .keyBy(Sensor::getSensorId)
    .timeWindow(Time.minutes(5))
    .trigger(DeltaTrigger.of(
        10.0, // threshold
        (oldReading, newReading) -> Math.abs(oldReading.getValue() - newReading.getValue()),
        TypeInformation.of(Sensor.class).createSerializer(env.getConfig())
    ));

// Purging trigger - purge after each firing
WindowedStream<Event, String, TimeWindow> purgingTriggered = keyedEvents
    .timeWindow(Time.minutes(5))
    .trigger(PurgingTrigger.of(CountTrigger.of(50)));

Built-in Evictors

Pre-defined evictors for removing elements from windows.

// Count evictor - keep only the last N elements
class CountEvictor<W extends Window> implements Evictor<Object, W> {
    static <W extends Window> CountEvictor<W> of(long maxCount);
    static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter);
}

// Delta evictor - remove elements that differ too much from the last element
class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
    static <T, W extends Window> DeltaEvictor<T, W> of(
        double threshold,
        DeltaFunction<T> deltaFunction
    );
    static <T, W extends Window> DeltaEvictor<T, W> of(
        double threshold,
        DeltaFunction<T> deltaFunction,
        boolean doEvictAfter
    );
}

// Time evictor - remove elements older than specified time
class TimeEvictor<W extends Window> implements Evictor<Object, W> {
    static <W extends Window> TimeEvictor<W> of(Time windowSize);
    static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter);
}

Usage Examples:

// Keep only the last 1000 elements in each window
WindowedStream<Event, String, TimeWindow> countEvicted = keyedEvents
    .timeWindow(Time.hours(1))
    .evictor(CountEvictor.of(1000));

// Remove elements older than 30 minutes
WindowedStream<Event, String, TimeWindow> timeEvicted = keyedEvents
    .timeWindow(Time.hours(2))
    .evictor(TimeEvictor.of(Time.minutes(30)));

// Remove elements with large delta from most recent
WindowedStream<Sensor, String, TimeWindow> deltaEvicted = sensorStream
    .keyBy(Sensor::getSensorId)
    .timeWindow(Time.minutes(10))
    .evictor(DeltaEvictor.of(
        5.0, // threshold
        (baseline, element) -> Math.abs(baseline.getValue() - element.getValue())
    ));

Types

Window Types

// Base window interface
abstract class Window {
    abstract long maxTimestamp();
}

// Time-based window
class TimeWindow extends Window {
    long getStart();
    long getEnd();
    long maxTimestamp();
    boolean intersects(TimeWindow other);
    TimeWindow cover(TimeWindow other);
}

// Global window (count-based)
class GlobalWindow extends Window {
    static GlobalWindow get();
}

Window Function Interfaces

// Basic window function
interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function {
    void apply(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
}

// Rich window function with context
abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    
    void clear(Context context) throws Exception;
    
    abstract class Context implements Serializable {
        abstract W window();
        abstract long currentProcessingTime();
        abstract long currentWatermark();
        abstract KeyedStateStore windowState();
        abstract KeyedStateStore globalState();
        abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}

// All window function (for non-keyed streams)
interface AllWindowFunction<IN, OUT, W extends Window> extends Function {
    void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
}

// Process all window function
abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
    abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    
    void clear(Context context) throws Exception;
    
    abstract class Context implements Serializable {
        abstract W window();
        abstract long currentProcessingTime();
        abstract long currentWatermark();
        abstract KeyedStateStore windowState();
        abstract KeyedStateStore globalState();
        abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}

Trigger Interface

// Trigger interface for determining when windows fire
abstract class Trigger<T, W extends Window> implements Serializable {
    abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
    abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
    abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
    abstract void clear(W window, TriggerContext ctx) throws Exception;
    
    // Trigger context
    interface TriggerContext {
        long getCurrentProcessingTime();
        long getCurrentWatermark();
        void registerProcessingTimeTimer(long time);
        void registerEventTimeTimer(long time);
        void deleteProcessingTimeTimer(long time);
        void deleteEventTimeTimer(long time);
        <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
    }
}

// Trigger result
enum TriggerResult {
    CONTINUE,    // Continue without firing
    FIRE_AND_PURGE,  // Fire and purge window contents
    FIRE,        // Fire but keep window contents
    PURGE        // Purge window contents without firing
}

Evictor Interface

// Evictor interface for removing elements from windows
interface Evictor<T, W extends Window> extends Serializable {
    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    
    // Evictor context
    interface EvictorContext {
        long getCurrentProcessingTime();
        long getCurrentWatermark();
    }
}

// Timestamped value for evictors
class TimestampedValue<T> {
    T getValue();
    long getTimestamp();
    boolean hasTimestamp();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11

docs

async-io.md

checkpointing.md

datastream-transformations.md

execution-environment.md

index.md

keyed-streams-state.md

process-functions.md

sources-sinks.md

time-watermarks.md

windowing.md

tile.json