Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Time-based stream joins with coordinated watermarks and window-based join operations. Demonstrates joining two data streams within time windows using event-time processing.
Window-based join of two data streams with coordinated watermarks and time-based join conditions.
/**
* Example of windowed stream joins
* Joins two streams within time windows using event-time processing
* @param args Command line arguments (--input path, --output path)
*/
public class WindowJoin {
public static void main(String[] args) throws Exception;
}Usage Example:
# Run with default sample data
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.join.WindowJoin
# Run with file input
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.join.WindowJoin \
--input /path/to/input.txt --output /path/to/results.txtScala implementation of windowed stream joins using functional API and case classes.
/**
* Scala version of windowed stream joins
* @param args Command line arguments
*/
object WindowJoin {
def main(args: Array[String]): Unit;
}// Enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Create two data streams with timestamps
DataStream<Tuple3<String, String, Long>> orangeStream = env
.addSource(new ThrottledIterator<>(OrangeSourceData.ORANGE_DATA.iterator(), elementsPerSecond))
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
DataStream<Tuple3<String, String, Long>> greenStream = env
.addSource(new ThrottledIterator<>(GreenSourceData.GREEN_DATA.iterator(), elementsPerSecond))
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
// Perform windowed join
DataStream<Tuple6<String, String, String, String, Long, Long>> joinedStream = orangeStream
.join(greenStream)
.where(new KeySelector<Tuple3<String, String, Long>, String>() {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0; // Join on first field
}
})
.equalTo(new KeySelector<Tuple3<String, String, Long>, String>() {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0; // Join on first field
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>,
Tuple6<String, String, String, String, Long, Long>>() {
@Override
public Tuple6<String, String, String, String, Long, Long> join(
Tuple3<String, String, Long> orange,
Tuple3<String, String, Long> green) throws Exception {
return new Tuple6<>(orange.f0, orange.f1, green.f1,
"JOINED", orange.f2, green.f2);
}
});private static class Tuple3TimestampExtractor
extends AscendingTimestampExtractor<Tuple3<String, String, Long>> {
@Override
public long extractAscendingTimestamp(Tuple3<String, String, Long> element) {
return element.f2; // Use third field as timestamp
}
}// Define join keys
KeySelector<Tuple3<String, String, Long>, String> keySelector =
new KeySelector<Tuple3<String, String, Long>, String>() {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0; // Join on first field (ID)
}
};
// Apply join with key selectors
orangeStream.join(greenStream)
.where(keySelector)
.equalTo(keySelector)
.window(TumblingEventTimeWindows.of(Time.seconds(windowSizeSeconds)))
.apply(joinFunction);// Input tuples: (id, value, timestamp)
Tuple3<String, String, Long> inputElement;
// f0: Join key (ID)
// f1: Data value
// f2: Event timestamp (milliseconds)// Join result: (id, leftValue, rightValue, joinType, leftTimestamp, rightTimestamp)
Tuple6<String, String, String, String, Long, Long> joinResult;
// f0: Join key
// f1: Left stream value
// f2: Right stream value
// f3: Join type indicator ("JOINED")
// f4: Left stream timestamp
// f5: Right stream timestamppublic class WindowJoinSampleData {
public static final String[] ORANGE_DATA = {
"orange-1,orange-data-1,1000",
"orange-2,orange-data-2,2000",
"orange-3,orange-data-3,3000",
// More sample data...
};
}// Use ThrottledIterator for controlled data emission
DataStream<Tuple3<String, String, Long>> orangeStream = env
.addSource(new ThrottledIterator<>(
OrangeSourceData.ORANGE_DATA.iterator(),
elementsPerSecond // Control emission rate
))
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());// Fixed-size non-overlapping windows based on event time
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))// Overlapping windows with slide interval
.window(SlidingEventTimeWindows.of(
Time.milliseconds(windowSize), // Window size
Time.milliseconds(slideInterval) // Slide interval
))// Variable-size windows based on activity gaps
.window(EventTimeSessionWindows.withGap(Time.minutes(sessionGapMinutes)))private static class BasicJoinFunction
implements JoinFunction<Tuple3<String, String, Long>,
Tuple3<String, String, Long>,
Tuple6<String, String, String, String, Long, Long>> {
@Override
public Tuple6<String, String, String, String, Long, Long> join(
Tuple3<String, String, Long> left,
Tuple3<String, String, Long> right) throws Exception {
return new Tuple6<>(
left.f0, // Join key
left.f1, // Left value
right.f1, // Right value
"INNER_JOIN", // Join type
left.f2, // Left timestamp
right.f2 // Right timestamp
);
}
}private static class StatefulJoinFunction
extends RichJoinFunction<Tuple3<String, String, Long>,
Tuple3<String, String, Long>,
Tuple6<String, String, String, String, Long, Long>> {
private ValueState<Long> joinCountState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"joinCount", Long.class, 0L);
joinCountState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple6<String, String, String, String, Long, Long> join(
Tuple3<String, String, Long> left,
Tuple3<String, String, Long> right) throws Exception {
Long currentCount = joinCountState.value();
joinCountState.update(currentCount + 1);
return new Tuple6<>(left.f0, left.f1, right.f1,
"JOIN_" + currentCount, left.f2, right.f2);
}
}// CoGroup allows custom join logic including outer joins
orangeStream.coGroup(greenStream)
.where(keySelector)
.equalTo(keySelector)
.window(TumblingEventTimeWindows.of(Time.seconds(windowSizeSeconds)))
.apply(new CoGroupFunction<Tuple3<String, String, Long>,
Tuple3<String, String, Long>,
String>() {
@Override
public void coGroup(
Iterable<Tuple3<String, String, Long>> left,
Iterable<Tuple3<String, String, Long>> right,
Collector<String> out) throws Exception {
// Custom join logic - can handle outer joins
for (Tuple3<String, String, Long> leftElement : left) {
boolean hasMatch = false;
for (Tuple3<String, String, Long> rightElement : right) {
out.collect("INNER: " + leftElement + " + " + rightElement);
hasMatch = true;
}
if (!hasMatch) {
out.collect("LEFT_OUTER: " + leftElement + " + null");
}
}
}
});// Join elements within time intervals
orangeStream.keyBy(keySelector)
.intervalJoin(greenStream.keyBy(keySelector))
.between(Time.milliseconds(-100), Time.milliseconds(100)) // ±100ms window
.process(new ProcessJoinFunction<Tuple3<String, String, Long>,
Tuple3<String, String, Long>,
String>() {
@Override
public void processElement(
Tuple3<String, String, Long> left,
Tuple3<String, String, Long> right,
Context ctx,
Collector<String> out) throws Exception {
out.collect("INTERVAL_JOIN: " + left + " + " + right);
}
});// Assign watermarks with bounded out-of-orderness
orangeStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(
Duration.ofMillis(100)) // 100ms max out-of-order
.withTimestampAssigner((element, timestamp) -> element.f2)
);// Configure late data handling
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(2)) // Allow 2 seconds late data
.sideOutputLateData(lateDataTag) // Collect late data in side output<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10