Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
—
Windowing in Apache Flink groups elements of unbounded streams into finite sets (windows) for batch-like processing. Windows can be based on time (event time or processing time) or element count, and can be tumbling (non-overlapping), sliding (overlapping), or session-based.
Create different types of windows based on time or count criteria.
// Time-based tumbling windows (non-overlapping)
WindowedStream<T, K, TimeWindow> timeWindow(Time size);
// Time-based sliding windows (overlapping)
WindowedStream<T, K, TimeWindow> timeWindow(Time size, Time slide);
// Count-based tumbling windows
WindowedStream<T, K, GlobalWindow> countWindow(long size);
// Count-based sliding windows
WindowedStream<T, K, GlobalWindow> countWindow(long size, long slide);
// Custom window assigner
<W extends Window> WindowedStream<T, K, W> window(WindowAssigner<? super T, W> assigner);Usage Examples:
DataStream<Tuple2<String, Integer>> input = env.fromElements(
Tuple2.of("a", 1), Tuple2.of("b", 2), Tuple2.of("a", 3)
);
KeyedStream<Tuple2<String, Integer>, String> keyed = input.keyBy(t -> t.f0);
// Tumbling time window - 5 minute non-overlapping windows
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> tumblingTime =
keyed.timeWindow(Time.minutes(5));
// Sliding time window - 10 minute windows, advancing every 2 minutes
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> slidingTime =
keyed.timeWindow(Time.minutes(10), Time.minutes(2));
// Tumbling count window - every 100 elements
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> tumblingCount =
keyed.countWindow(100);
// Sliding count window - 1000 elements, sliding by 100
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> slidingCount =
keyed.countWindow(1000, 100);
// Session window - group by activity sessions with 30 minute timeout
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> sessionWindow =
keyed.window(EventTimeSessionWindows.withGap(Time.minutes(30)));
// Global window - single window for all elements
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> globalWindow =
keyed.window(GlobalWindows.create());Built-in window assigners for common windowing patterns.
// Tumbling event time windows
class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
static TumblingEventTimeWindows of(Time size);
static TumblingEventTimeWindows of(Time size, Time offset);
}
// Sliding event time windows
class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
static SlidingEventTimeWindows of(Time size, Time slide);
static SlidingEventTimeWindows of(Time size, Time slide, Time offset);
}
// Tumbling processing time windows
class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
static TumblingProcessingTimeWindows of(Time size);
static TumblingProcessingTimeWindows of(Time size, Time offset);
}
// Sliding processing time windows
class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
static SlidingProcessingTimeWindows of(Time size, Time slide);
static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset);
}
// Session windows based on event time
class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
static EventTimeSessionWindows withGap(Time sessionTimeout);
static EventTimeSessionWindows withDynamicGap(SessionWindowTimeGapExtractor<Object> sessionTimeGapExtractor);
}
// Session windows based on processing time
class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
static ProcessingTimeSessionWindows withGap(Time sessionTimeout);
static ProcessingTimeSessionWindows withDynamicGap(SessionWindowTimeGapExtractor<Object> sessionTimeGapExtractor);
}
// Global window assigner
class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
static GlobalWindows create();
}Usage Examples:
KeyedStream<Event, String> keyedEvents = events.keyBy(Event::getUserId);
// Tumbling event time windows with offset (e.g., hourly windows starting at :15)
WindowedStream<Event, String, TimeWindow> hourlyWindows = keyedEvents.window(
TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))
);
// Sliding processing time windows
WindowedStream<Event, String, TimeWindow> slidingProcessingTime = keyedEvents.window(
SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(2))
);
// Session windows with dynamic gap
WindowedStream<Event, String, TimeWindow> sessionWindows = keyedEvents.window(
EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Event>() {
@Override
public long extract(Event element) {
return element.getSessionTimeout(); // Custom session timeout per element
}
})
);Apply different types of operations to windowed streams.
/**
* Reduce window contents using a ReduceFunction
* @param function - reduce function to combine elements
* @return reduced DataStream
*/
DataStream<T> reduce(ReduceFunction<T> function);
/**
* Aggregate window contents using an AggregateFunction
* @param aggFunction - aggregate function for incremental aggregation
* @return aggregated DataStream
*/
<ACC, R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> aggFunction);
/**
* Apply a WindowFunction to window contents
* @param function - window function to apply
* @return transformed DataStream
*/
<R> DataStream<R> apply(WindowFunction<T, R, K, W> function);
/**
* Apply a ProcessWindowFunction to window contents
* @param function - process window function with access to window metadata
* @return processed DataStream
*/
<R> DataStream<R> process(ProcessWindowFunction<T, R, K, W> function);
/**
* Combine reduce with window function for efficient processing
* @param reduceFunction - reduce function for pre-aggregation
* @param windowFunction - window function for final processing
* @return processed DataStream
*/
<R> DataStream<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> windowFunction);
/**
* Combine aggregate with process window function
* @param aggFunction - aggregate function for pre-aggregation
* @param windowFunction - process window function for final processing
* @return processed DataStream
*/
<ACC, R> DataStream<R> aggregate(
AggregateFunction<T, ACC, R> aggFunction,
ProcessWindowFunction<R, R, K, W> windowFunction
);Usage Examples:
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream =
keyed.timeWindow(Time.minutes(5));
// Reduce - sum all values in window
DataStream<Tuple2<String, Integer>> reduced = windowedStream.reduce(
(value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1)
);
// Aggregate - calculate average
DataStream<Tuple2<String, Double>> avgResult = windowedStream.aggregate(
new AggregateFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, Tuple2<String, Double>>() {
@Override
public Tuple3<String, Integer, Integer> createAccumulator() {
return Tuple3.of("", 0, 0);
}
@Override
public Tuple3<String, Integer, Integer> add(
Tuple2<String, Integer> value,
Tuple3<String, Integer, Integer> accumulator
) {
return Tuple3.of(value.f0, accumulator.f1 + value.f1, accumulator.f2 + 1);
}
@Override
public Tuple2<String, Double> getResult(Tuple3<String, Integer, Integer> accumulator) {
return Tuple2.of(accumulator.f0, (double) accumulator.f1 / accumulator.f2);
}
@Override
public Tuple3<String, Integer, Integer> merge(
Tuple3<String, Integer, Integer> a,
Tuple3<String, Integer, Integer> b
) {
return Tuple3.of(a.f0, a.f1 + b.f1, a.f2 + b.f2);
}
}
);
// Apply window function with access to window metadata
DataStream<String> windowInfo = windowedStream.apply(
new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
@Override
public void apply(
String key,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<String> out
) {
int count = 0;
int sum = 0;
for (Tuple2<String, Integer> value : values) {
count++;
sum += value.f1;
}
out.collect(String.format("Key: %s, Window: [%d-%d], Count: %d, Sum: %d",
key, window.getStart(), window.getEnd(), count, sum));
}
}
);
// Process window function with rich context
DataStream<WindowResult> processResult = windowedStream.process(
new ProcessWindowFunction<Tuple2<String, Integer>, WindowResult, String, TimeWindow>() {
@Override
public void process(
String key,
Context context,
Iterable<Tuple2<String, Integer>> elements,
Collector<WindowResult> out
) {
int count = 0;
int sum = 0;
for (Tuple2<String, Integer> element : elements) {
count++;
sum += element.f1;
}
WindowResult result = new WindowResult(
key,
context.window().getStart(),
context.window().getEnd(),
count,
sum,
context.currentWatermark()
);
out.collect(result);
}
}
);Configure window behavior with triggers, evictors, and late data handling.
/**
* Set a custom trigger for the window
* @param trigger - trigger to determine when window should fire
* @return configured WindowedStream
*/
WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger);
/**
* Set a custom evictor for the window
* @param evictor - evictor to remove elements from window
* @return configured WindowedStream
*/
WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor);
/**
* Set allowed lateness for late arriving elements
* @param lateness - maximum allowed lateness
* @return configured WindowedStream
*/
WindowedStream<T, K, W> allowedLateness(Time lateness);
/**
* Configure side output for late data
* @param outputTag - output tag for late elements
* @return configured WindowedStream
*/
WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag);Usage Examples:
OutputTag<Tuple2<String, Integer>> lateDataTag =
new OutputTag<Tuple2<String, Integer>>("late-data"){};
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> configuredWindow =
keyed.timeWindow(Time.minutes(5))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30))) // Fire every 30 seconds
.allowedLateness(Time.minutes(2)) // Allow 2 minutes of lateness
.sideOutputLateData(lateDataTag); // Send late data to side output
DataStream<Tuple2<String, Integer>> result = configuredWindow.reduce(
(v1, v2) -> Tuple2.of(v1.f0, v1.f1 + v2.f1)
);
// Get late data from side output
DataStream<Tuple2<String, Integer>> lateData = result.getSideOutput(lateDataTag);Pre-defined triggers for common window firing patterns.
// Event time trigger - fires when watermark passes window end
class EventTimeTrigger implements Trigger<Object, TimeWindow> {
static EventTimeTrigger create();
}
// Processing time trigger - fires when processing time passes window end
class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
static ProcessingTimeTrigger create();
}
// Count trigger - fires when element count reaches threshold
class CountTrigger<W extends Window> implements Trigger<Object, W> {
static <W extends Window> CountTrigger<W> of(long maxCount);
}
// Continuous event time trigger - fires repeatedly based on interval
class ContinuousEventTimeTrigger implements Trigger<Object, TimeWindow> {
static ContinuousEventTimeTrigger of(Time interval);
}
// Continuous processing time trigger - fires repeatedly based on interval
class ContinuousProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
static ContinuousProcessingTimeTrigger of(Time interval);
}
// Delta trigger - fires when delta between elements exceeds threshold
class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
static <T, W extends Window> DeltaTrigger<T, W> of(
double threshold,
DeltaFunction<T> deltaFunction,
TypeSerializer<T> stateSerializer
);
}
// Purging trigger - wraps another trigger and purges window contents after firing
class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger);
}Usage Examples:
// Count-based trigger - fire every 100 elements
WindowedStream<Event, String, TimeWindow> countTriggered = keyedEvents
.timeWindow(Time.hours(1))
.trigger(CountTrigger.of(100));
// Continuous firing every 30 seconds during window
WindowedStream<Event, String, TimeWindow> continuousTriggered = keyedEvents
.timeWindow(Time.minutes(10))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)));
// Delta trigger - fire when value changes by more than threshold
WindowedStream<Sensor, String, TimeWindow> deltaTriggered = sensorStream
.keyBy(Sensor::getSensorId)
.timeWindow(Time.minutes(5))
.trigger(DeltaTrigger.of(
10.0, // threshold
(oldReading, newReading) -> Math.abs(oldReading.getValue() - newReading.getValue()),
TypeInformation.of(Sensor.class).createSerializer(env.getConfig())
));
// Purging trigger - purge after each firing
WindowedStream<Event, String, TimeWindow> purgingTriggered = keyedEvents
.timeWindow(Time.minutes(5))
.trigger(PurgingTrigger.of(CountTrigger.of(50)));Pre-defined evictors for removing elements from windows.
// Count evictor - keep only the last N elements
class CountEvictor<W extends Window> implements Evictor<Object, W> {
static <W extends Window> CountEvictor<W> of(long maxCount);
static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter);
}
// Delta evictor - remove elements that differ too much from the last element
class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
static <T, W extends Window> DeltaEvictor<T, W> of(
double threshold,
DeltaFunction<T> deltaFunction
);
static <T, W extends Window> DeltaEvictor<T, W> of(
double threshold,
DeltaFunction<T> deltaFunction,
boolean doEvictAfter
);
}
// Time evictor - remove elements older than specified time
class TimeEvictor<W extends Window> implements Evictor<Object, W> {
static <W extends Window> TimeEvictor<W> of(Time windowSize);
static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter);
}Usage Examples:
// Keep only the last 1000 elements in each window
WindowedStream<Event, String, TimeWindow> countEvicted = keyedEvents
.timeWindow(Time.hours(1))
.evictor(CountEvictor.of(1000));
// Remove elements older than 30 minutes
WindowedStream<Event, String, TimeWindow> timeEvicted = keyedEvents
.timeWindow(Time.hours(2))
.evictor(TimeEvictor.of(Time.minutes(30)));
// Remove elements with large delta from most recent
WindowedStream<Sensor, String, TimeWindow> deltaEvicted = sensorStream
.keyBy(Sensor::getSensorId)
.timeWindow(Time.minutes(10))
.evictor(DeltaEvictor.of(
5.0, // threshold
(baseline, element) -> Math.abs(baseline.getValue() - element.getValue())
));// Base window interface
abstract class Window {
abstract long maxTimestamp();
}
// Time-based window
class TimeWindow extends Window {
long getStart();
long getEnd();
long maxTimestamp();
boolean intersects(TimeWindow other);
TimeWindow cover(TimeWindow other);
}
// Global window (count-based)
class GlobalWindow extends Window {
static GlobalWindow get();
}// Basic window function
interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function {
void apply(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
}
// Rich window function with context
abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
void clear(Context context) throws Exception;
abstract class Context implements Serializable {
abstract W window();
abstract long currentProcessingTime();
abstract long currentWatermark();
abstract KeyedStateStore windowState();
abstract KeyedStateStore globalState();
abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
// All window function (for non-keyed streams)
interface AllWindowFunction<IN, OUT, W extends Window> extends Function {
void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
}
// Process all window function
abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
void clear(Context context) throws Exception;
abstract class Context implements Serializable {
abstract W window();
abstract long currentProcessingTime();
abstract long currentWatermark();
abstract KeyedStateStore windowState();
abstract KeyedStateStore globalState();
abstract <X> void output(OutputTag<X> outputTag, X value);
}
}// Trigger interface for determining when windows fire
abstract class Trigger<T, W extends Window> implements Serializable {
abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
abstract void clear(W window, TriggerContext ctx) throws Exception;
// Trigger context
interface TriggerContext {
long getCurrentProcessingTime();
long getCurrentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}
// Trigger result
enum TriggerResult {
CONTINUE, // Continue without firing
FIRE_AND_PURGE, // Fire and purge window contents
FIRE, // Fire but keep window contents
PURGE // Purge window contents without firing
}// Evictor interface for removing elements from windows
interface Evictor<T, W extends Window> extends Serializable {
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
// Evictor context
interface EvictorContext {
long getCurrentProcessingTime();
long getCurrentWatermark();
}
}
// Timestamped value for evictors
class TimestampedValue<T> {
T getValue();
long getTimestamp();
boolean hasTimestamp();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11