0
# Side Output Examples
1
2
Advanced stream processing patterns using side outputs to split streams based on conditions. Side outputs allow a single operator to emit data to multiple output streams without complex branching logic.
3
4
## Capabilities
5
6
### SideOutputExample
7
8
Demonstrates splitting a stream using side outputs with conditional logic in a ProcessFunction.
9
10
```java { .api }
11
/**
12
* Stream processing with side outputs for conditional data routing
13
* Splits word count stream based on word length filtering
14
* @param args Command line arguments (--input, --output, --rejected-words-output)
15
*/
16
public class SideOutputExample {
17
static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};
18
public static void main(String[] args) throws Exception;
19
}
20
```
21
22
**Usage Example:**
23
24
```bash
25
# Run with file input/output for both main and side streams
26
java -cp flink-examples-streaming_2.10-1.3.3.jar \
27
org.apache.flink.streaming.examples.sideoutput.SideOutputExample \
28
--input /path/to/input.txt \
29
--output /path/to/main-output.txt \
30
--rejected-words-output /path/to/rejected-words.txt
31
32
# Run with default data (prints both streams to stdout)
33
java -cp flink-examples-streaming_2.10-1.3.3.jar \
34
org.apache.flink.streaming.examples.sideoutput.SideOutputExample
35
```
36
37
### OutputTag Definition
38
39
Type-safe output tag for routing data to side outputs.
40
41
```java { .api }
42
/**
43
* Output tag for routing data to named side outputs
44
* @param <T> Type of elements sent to this side output
45
*/
46
class OutputTag<T> {
47
public OutputTag(String id);
48
}
49
```
50
51
**Usage Pattern:**
52
```java
53
// Define output tag for rejected words
54
static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};
55
```
56
57
### Tokenizer ProcessFunction
58
59
ProcessFunction that splits text and routes long words to side output.
60
61
```java { .api }
62
/**
63
* ProcessFunction for splitting text with conditional side output routing
64
* Routes words longer than 5 characters to side output, others to main stream
65
*/
66
public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
67
/**
68
* Process input text and conditionally route to main or side output
69
* @param value Input text line
70
* @param ctx ProcessFunction context for side output access
71
* @param out Main output collector for word-count pairs
72
*/
73
public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out)
74
throws Exception;
75
}
76
```
77
78
The tokenizer implementation:
79
- Splits input text on non-word characters using regex `\\W+`
80
- Normalizes words to lowercase
81
- Words > 5 characters: sent to side output using `ctx.output(rejectedWordsTag, word)`
82
- Words ≤ 5 characters: sent to main stream as `Tuple2<String, Integer>(word, 1)`
83
- Filters out empty tokens
84
85
## Side Output Processing Patterns
86
87
### Stream Splitting and Processing
88
89
```java
90
// Main processing pipeline with side output extraction
91
SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text
92
.process(new Tokenizer());
93
94
// Extract side output stream for rejected words
95
DataStream<String> rejectedWords = tokenized
96
.getSideOutput(rejectedWordsTag)
97
.map(new MapFunction<String, String>() {
98
@Override
99
public String map(String value) throws Exception {
100
return "Rejected: " + value;
101
}
102
});
103
104
// Main stream continues with windowing and aggregation
105
DataStream<Tuple2<String, Integer>> counts = tokenized
106
.keyBy(0)
107
.timeWindow(Time.seconds(5))
108
.sum(1);
109
```
110
111
### ProcessFunction Context Usage
112
113
```java
114
public static class CustomProcessFunction extends ProcessFunction<String, String> {
115
private final OutputTag<String> sideOutputTag = new OutputTag<String>("side") {};
116
117
@Override
118
public void processElement(String value, Context ctx, Collector<String> out) {
119
if (someCondition(value)) {
120
// Send to main output
121
out.collect("Main: " + value);
122
} else {
123
// Send to side output
124
ctx.output(sideOutputTag, "Side: " + value);
125
}
126
}
127
}
128
```
129
130
### Multiple Side Outputs
131
132
```java
133
// Define multiple output tags
134
static final OutputTag<String> errorTag = new OutputTag<String>("errors") {};
135
static final OutputTag<String> warningTag = new OutputTag<String>("warnings") {};
136
static final OutputTag<String> infoTag = new OutputTag<String>("info") {};
137
138
// Route to different side outputs based on log level
139
public void processElement(LogEntry value, Context ctx, Collector<LogEntry> out) {
140
switch (value.getLevel()) {
141
case ERROR:
142
ctx.output(errorTag, value.getMessage());
143
break;
144
case WARNING:
145
ctx.output(warningTag, value.getMessage());
146
break;
147
case INFO:
148
ctx.output(infoTag, value.getMessage());
149
break;
150
default:
151
out.collect(value);
152
}
153
}
154
155
// Extract and process each side output separately
156
DataStream<String> errors = mainStream.getSideOutput(errorTag);
157
DataStream<String> warnings = mainStream.getSideOutput(warningTag);
158
DataStream<String> info = mainStream.getSideOutput(infoTag);
159
```
160
161
## ProcessFunction API
162
163
### Core ProcessFunction
164
165
Base class for low-level stream processing with side output support.
166
167
```java { .api }
168
/**
169
* Base class for user-defined functions that process elements and have access to context and side outputs
170
* @param <I> Input element type
171
* @param <O> Output element type
172
*/
173
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
174
/**
175
* Process individual elements with access to context and collectors
176
* @param value Input element
177
* @param ctx Context providing access to side outputs, timers, and metadata
178
* @param out Main output collector
179
*/
180
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
181
182
/**
183
* Called when a timer fires - for timer-based processing
184
* @param timestamp Timestamp when timer was set to fire
185
* @param ctx Context providing access to side outputs and timers
186
* @param out Main output collector
187
*/
188
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
189
}
190
```
191
192
### ProcessFunction Context
193
194
Context interface providing access to side outputs and processing metadata.
195
196
```java { .api }
197
/**
198
* Context interface for ProcessFunction providing side output and timer access
199
*/
200
public abstract class Context {
201
/**
202
* Emit data to a side output identified by OutputTag
203
* @param outputTag Output tag identifying the side output
204
* @param value Value to emit to side output
205
*/
206
public abstract <X> void output(OutputTag<X> outputTag, X value);
207
208
/**
209
* Get current processing timestamp
210
* @return Processing time timestamp
211
*/
212
public abstract long timestamp();
213
214
/**
215
* Get current watermark
216
* @return Current watermark timestamp
217
*/
218
public abstract long currentWatermark();
219
}
220
```
221
222
## Event Time and Windowing with Side Outputs
223
224
### Time-based Side Output Routing
225
226
```java
227
// Configure event time processing
228
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
229
230
// Process with time-based side output routing
231
DataStream<TimestampedEvent> processedStream = inputStream
232
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<TimestampedEvent>() {
233
@Override
234
public long extractAscendingTimestamp(TimestampedEvent element) {
235
return element.getTimestamp();
236
}
237
})
238
.process(new TimeBasedProcessor());
239
240
// Main stream: process recent events
241
DataStream<TimestampedEvent> recentEvents = processedStream
242
.keyBy(event -> event.getKey())
243
.timeWindow(Time.minutes(5))
244
.reduce(new EventReducer());
245
246
// Side output: handle late events
247
DataStream<TimestampedEvent> lateEvents = processedStream
248
.getSideOutput(lateEventsTag)
249
.map(event -> event.markAsLate());
250
```
251
252
### Window Processing with Side Outputs
253
254
```java
255
// Windowed aggregation with side output for processing metadata
256
DataStream<Tuple2<String, Integer>> windowedCounts = tokenized
257
.keyBy(0)
258
.timeWindow(Time.seconds(5))
259
.sum(1);
260
261
// Side output contains rejected words with timestamps
262
DataStream<String> rejectedWithTime = tokenized
263
.getSideOutput(rejectedWordsTag)
264
.map(new MapFunction<String, String>() {
265
@Override
266
public String map(String word) throws Exception {
267
return String.format("Rejected at %d: %s", System.currentTimeMillis(), word);
268
}
269
});
270
```
271
272
## Use Cases and Patterns
273
274
### Data Quality Filtering
275
- Route invalid records to side output for monitoring
276
- Process valid records in main stream
277
- Generate quality metrics from side output
278
279
### Alert Generation
280
- Main stream: normal processing
281
- Side output: critical events requiring immediate attention
282
- Separate processing pipelines for alerts vs normal data
283
284
### A/B Testing
285
- Route traffic to different processing paths
286
- Compare results from main stream vs side outputs
287
- Feature flag-based routing decisions
288
289
### Multi-tenant Processing
290
- Route data by tenant ID to different side outputs
291
- Tenant-specific processing and storage
292
- Resource isolation and monitoring per tenant
293
294
## Dependencies
295
296
```xml
297
<dependency>
298
<groupId>org.apache.flink</groupId>
299
<artifactId>flink-streaming-java_2.10</artifactId>
300
<version>1.3.3</version>
301
</dependency>
302
```
303
304
## Required Imports
305
306
```java
307
import org.apache.flink.streaming.api.functions.ProcessFunction;
308
import org.apache.flink.util.OutputTag;
309
import org.apache.flink.api.java.tuple.Tuple2;
310
import org.apache.flink.streaming.api.TimeCharacteristic;
311
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
312
import org.apache.flink.streaming.api.windowing.time.Time;
313
import org.apache.flink.util.Collector;
314
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
315
```