Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Advanced windowing patterns including time windows, session windows, and custom triggers with event-time processing. Demonstrates various window types, eviction policies, and complex triggering mechanisms.
Car speed monitoring with custom triggers and evictors, demonstrating global windows with delta triggers.
/**
* Grouped stream windowing with custom eviction and trigger policies
* Monitors car speeds and triggers top speed calculation every x meters
* @param args Command line arguments (--input path, --output path)
*/
public class TopSpeedWindowing {
public static void main(String[] args) throws Exception;
}Usage Example:
// Run with sample car data generator
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
// Run with file input
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing \
--input /path/to/car-data.txt --output /path/to/results.txtSession-based windowing for user activity analysis with configurable session gaps.
/**
* Session windowing example for analyzing user activity sessions
* Groups events into sessions based on activity gaps
* @param args Command line arguments (--input path, --output path)
*/
public class SessionWindowing {
public static void main(String[] args) throws Exception;
}Basic windowed word count with tumbling time windows.
/**
* Word count with tumbling time windows
* Demonstrates basic time-based windowing concepts
* @param args Command line arguments (--input path, --output path)
*/
public class WindowWordCount {
public static void main(String[] args) throws Exception;
}High-throughput processing time windows with sliding window patterns and parallel data generation.
/**
* Processing time windows with grouped keys and sliding windows
* Performance benchmark with 20M elements across 10K keys
* @param args Command line arguments
*/
public class GroupedProcessingTimeWindowExample {
public static void main(String[] args) throws Exception;
}Usage Example:
# Run high-throughput windowing benchmark
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.windowing.GroupedProcessingTimeWindowExampleCustom parallel source function for performance testing.
/**
* High-throughput parallel source generating tuple data
* Generates 20,000,000 elements across multiple parallel instances
*/
public class RichParallelSourceFunction<Tuple2<Long, Long>> {
private volatile boolean running = true;
/**
* Main data generation loop
* @param ctx Source context for element emission
*/
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
/**
* Cancel source execution
*/
public void cancel();
}Extracts keys from tuple types for grouping operations.
/**
* Generic key extractor for tuple types
* @param <Type> Tuple type extending Tuple
* @param <Key> Key type for grouping
*/
public static class FirstFieldKeyExtractor<Type extends Tuple, Key>
implements KeySelector<Type, Key> {
/**
* Extract key from tuple first field
* @param value Input tuple
* @return Key for grouping (first field of tuple)
*/
public Key getKey(Type value);
}Pre-aggregating reduce function for efficient windowing.
/**
* Efficient reduce function for summing tuple values
* Pre-aggregates values within window before output
*/
public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>> {
/**
* Combine two tuples by summing their second field
* @param value1 First tuple
* @param value2 Second tuple
* @return Combined tuple with summed second field
*/
public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2);
}Non-pre-aggregating window function for custom aggregation logic.
/**
* Window function that processes all elements at window trigger time
* Demonstrates non-pre-aggregating pattern vs reduce function
*/
public static class SummingWindowFunction
implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
/**
* Process all window elements when window triggers
* @param key Window key
* @param window Window metadata
* @param values All elements in window
* @param out Output collector
*/
public void apply(Long key, Window window,
Iterable<Tuple2<Long, Long>> values,
Collector<Tuple2<Long, Long>> out);
}Scala implementation of car speed windowing using functional API and case classes.
/**
* Scala version of car top speed windowing
* @param args Command line arguments
*/
object TopSpeedWindowing {
def main(args: Array[String]): Unit;
}// Global windows with time evictor and delta trigger
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
.assignTimestampsAndWatermarks(new CarTimestamp())
.keyBy(0) // Group by car ID
.window(GlobalWindows.create())
.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
.trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
@Override
public double getDelta(
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2; // Distance delta
}
}, carData.getType().createSerializer(env.getConfig())))
.maxBy(1); // Get max speed// Session windows with configurable gap
dataStream
.keyBy(keySelector)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.apply(sessionWindowFunction);// Sliding windows with size and slide interval (processing time)
dataStream
.keyBy(keySelector)
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) // 2.5s window, 0.5s slide
.reduce(new SummingReducer());
// Alternative with apply function (non-pre-aggregating)
dataStream
.keyBy(keySelector)
.timeWindow(Time.milliseconds(2500), Time.milliseconds(500))
.apply(new SummingWindowFunction());// Fixed-size tumbling windows
dataStream
.keyBy(keySelector)
.timeWindow(Time.seconds(30))
.reduce(aggregationFunction);// Enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Assign timestamps and watermarks
dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<T>() {
@Override
public long extractAscendingTimestamp(T element) {
return element.getTimestamp();
}
});// Car telemetry tuple: (carId, speed, distance, timestamp)
Tuple4<Integer, Integer, Double, Long> carData;
// f0: Car ID
// f1: Current speed (km/h)
// f2: Total distance traveled (meters)
// f3: Timestamp (milliseconds)private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
@Override
public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {
while (isRunning) {
Thread.sleep(100); // 100ms intervals
for (int carId = 0; carId < speeds.length; carId++) {
// Randomly adjust speed
if (rand.nextBoolean()) {
speeds[carId] = Math.min(100, speeds[carId] + 5);
} else {
speeds[carId] = Math.max(0, speeds[carId] - 5);
}
// Update distance traveled
distances[carId] += speeds[carId] / 3.6d; // Convert km/h to m/s
Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(
carId, speeds[carId], distances[carId], System.currentTimeMillis());
ctx.collect(record);
}
}
}
}// Configure session gap based on user activity
ProcessingTimeSessionWindows.withGap(Time.minutes(30)) // 30-minute inactivity gap
EventTimeSessionWindows.withGap(Time.seconds(60)) // 1-minute gap for event time// Variable session gaps based on data
EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<T>() {
@Override
public long extract(T element) {
// Return gap in milliseconds based on element properties
return element.getUserType().equals("PREMIUM") ? 600000L : 300000L;
}
});<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10