Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Advanced stream processing patterns using side outputs to split streams based on conditions. Side outputs allow a single operator to emit data to multiple output streams without complex branching logic.
Demonstrates splitting a stream using side outputs with conditional logic in a ProcessFunction.
/**
* Stream processing with side outputs for conditional data routing
* Splits word count stream based on word length filtering
* @param args Command line arguments (--input, --output, --rejected-words-output)
*/
public class SideOutputExample {
static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};
public static void main(String[] args) throws Exception;
}Usage Example:
# Run with file input/output for both main and side streams
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.sideoutput.SideOutputExample \
--input /path/to/input.txt \
--output /path/to/main-output.txt \
--rejected-words-output /path/to/rejected-words.txt
# Run with default data (prints both streams to stdout)
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.sideoutput.SideOutputExampleType-safe output tag for routing data to side outputs.
/**
* Output tag for routing data to named side outputs
* @param <T> Type of elements sent to this side output
*/
class OutputTag<T> {
public OutputTag(String id);
}Usage Pattern:
// Define output tag for rejected words
static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};ProcessFunction that splits text and routes long words to side output.
/**
* ProcessFunction for splitting text with conditional side output routing
* Routes words longer than 5 characters to side output, others to main stream
*/
public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
/**
* Process input text and conditionally route to main or side output
* @param value Input text line
* @param ctx ProcessFunction context for side output access
* @param out Main output collector for word-count pairs
*/
public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out)
throws Exception;
}The tokenizer implementation:
\\W+ctx.output(rejectedWordsTag, word)Tuple2<String, Integer>(word, 1)// Main processing pipeline with side output extraction
SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text
.process(new Tokenizer());
// Extract side output stream for rejected words
DataStream<String> rejectedWords = tokenized
.getSideOutput(rejectedWordsTag)
.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return "Rejected: " + value;
}
});
// Main stream continues with windowing and aggregation
DataStream<Tuple2<String, Integer>> counts = tokenized
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);public static class CustomProcessFunction extends ProcessFunction<String, String> {
private final OutputTag<String> sideOutputTag = new OutputTag<String>("side") {};
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
if (someCondition(value)) {
// Send to main output
out.collect("Main: " + value);
} else {
// Send to side output
ctx.output(sideOutputTag, "Side: " + value);
}
}
}// Define multiple output tags
static final OutputTag<String> errorTag = new OutputTag<String>("errors") {};
static final OutputTag<String> warningTag = new OutputTag<String>("warnings") {};
static final OutputTag<String> infoTag = new OutputTag<String>("info") {};
// Route to different side outputs based on log level
public void processElement(LogEntry value, Context ctx, Collector<LogEntry> out) {
switch (value.getLevel()) {
case ERROR:
ctx.output(errorTag, value.getMessage());
break;
case WARNING:
ctx.output(warningTag, value.getMessage());
break;
case INFO:
ctx.output(infoTag, value.getMessage());
break;
default:
out.collect(value);
}
}
// Extract and process each side output separately
DataStream<String> errors = mainStream.getSideOutput(errorTag);
DataStream<String> warnings = mainStream.getSideOutput(warningTag);
DataStream<String> info = mainStream.getSideOutput(infoTag);Base class for low-level stream processing with side output support.
/**
* Base class for user-defined functions that process elements and have access to context and side outputs
* @param <I> Input element type
* @param <O> Output element type
*/
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
/**
* Process individual elements with access to context and collectors
* @param value Input element
* @param ctx Context providing access to side outputs, timers, and metadata
* @param out Main output collector
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* Called when a timer fires - for timer-based processing
* @param timestamp Timestamp when timer was set to fire
* @param ctx Context providing access to side outputs and timers
* @param out Main output collector
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
}Context interface providing access to side outputs and processing metadata.
/**
* Context interface for ProcessFunction providing side output and timer access
*/
public abstract class Context {
/**
* Emit data to a side output identified by OutputTag
* @param outputTag Output tag identifying the side output
* @param value Value to emit to side output
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
/**
* Get current processing timestamp
* @return Processing time timestamp
*/
public abstract long timestamp();
/**
* Get current watermark
* @return Current watermark timestamp
*/
public abstract long currentWatermark();
}// Configure event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Process with time-based side output routing
DataStream<TimestampedEvent> processedStream = inputStream
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<TimestampedEvent>() {
@Override
public long extractAscendingTimestamp(TimestampedEvent element) {
return element.getTimestamp();
}
})
.process(new TimeBasedProcessor());
// Main stream: process recent events
DataStream<TimestampedEvent> recentEvents = processedStream
.keyBy(event -> event.getKey())
.timeWindow(Time.minutes(5))
.reduce(new EventReducer());
// Side output: handle late events
DataStream<TimestampedEvent> lateEvents = processedStream
.getSideOutput(lateEventsTag)
.map(event -> event.markAsLate());// Windowed aggregation with side output for processing metadata
DataStream<Tuple2<String, Integer>> windowedCounts = tokenized
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// Side output contains rejected words with timestamps
DataStream<String> rejectedWithTime = tokenized
.getSideOutput(rejectedWordsTag)
.map(new MapFunction<String, String>() {
@Override
public String map(String word) throws Exception {
return String.format("Rejected at %d: %s", System.currentTimeMillis(), word);
}
});<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10