or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdexternal-systems.mdindex.mditeration.mdjoins.mdmachine-learning.mdside-output.mdsocket.mdutilities.mdwindowing.mdwordcount.md

side-output.mddocs/

0

# Side Output Examples

1

2

Advanced stream processing patterns using side outputs to split streams based on conditions. Side outputs allow a single operator to emit data to multiple output streams without complex branching logic.

3

4

## Capabilities

5

6

### SideOutputExample

7

8

Demonstrates splitting a stream using side outputs with conditional logic in a ProcessFunction.

9

10

```java { .api }

11

/**

12

* Stream processing with side outputs for conditional data routing

13

* Splits word count stream based on word length filtering

14

* @param args Command line arguments (--input, --output, --rejected-words-output)

15

*/

16

public class SideOutputExample {

17

static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};

18

public static void main(String[] args) throws Exception;

19

}

20

```

21

22

**Usage Example:**

23

24

```bash

25

# Run with file input/output for both main and side streams

26

java -cp flink-examples-streaming_2.10-1.3.3.jar \

27

org.apache.flink.streaming.examples.sideoutput.SideOutputExample \

28

--input /path/to/input.txt \

29

--output /path/to/main-output.txt \

30

--rejected-words-output /path/to/rejected-words.txt

31

32

# Run with default data (prints both streams to stdout)

33

java -cp flink-examples-streaming_2.10-1.3.3.jar \

34

org.apache.flink.streaming.examples.sideoutput.SideOutputExample

35

```

36

37

### OutputTag Definition

38

39

Type-safe output tag for routing data to side outputs.

40

41

```java { .api }

42

/**

43

* Output tag for routing data to named side outputs

44

* @param <T> Type of elements sent to this side output

45

*/

46

class OutputTag<T> {

47

public OutputTag(String id);

48

}

49

```

50

51

**Usage Pattern:**

52

```java

53

// Define output tag for rejected words

54

static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};

55

```

56

57

### Tokenizer ProcessFunction

58

59

ProcessFunction that splits text and routes long words to side output.

60

61

```java { .api }

62

/**

63

* ProcessFunction for splitting text with conditional side output routing

64

* Routes words longer than 5 characters to side output, others to main stream

65

*/

66

public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {

67

/**

68

* Process input text and conditionally route to main or side output

69

* @param value Input text line

70

* @param ctx ProcessFunction context for side output access

71

* @param out Main output collector for word-count pairs

72

*/

73

public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out)

74

throws Exception;

75

}

76

```

77

78

The tokenizer implementation:

79

- Splits input text on non-word characters using regex `\\W+`

80

- Normalizes words to lowercase

81

- Words > 5 characters: sent to side output using `ctx.output(rejectedWordsTag, word)`

82

- Words ≤ 5 characters: sent to main stream as `Tuple2<String, Integer>(word, 1)`

83

- Filters out empty tokens

84

85

## Side Output Processing Patterns

86

87

### Stream Splitting and Processing

88

89

```java

90

// Main processing pipeline with side output extraction

91

SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text

92

.process(new Tokenizer());

93

94

// Extract side output stream for rejected words

95

DataStream<String> rejectedWords = tokenized

96

.getSideOutput(rejectedWordsTag)

97

.map(new MapFunction<String, String>() {

98

@Override

99

public String map(String value) throws Exception {

100

return "Rejected: " + value;

101

}

102

});

103

104

// Main stream continues with windowing and aggregation

105

DataStream<Tuple2<String, Integer>> counts = tokenized

106

.keyBy(0)

107

.timeWindow(Time.seconds(5))

108

.sum(1);

109

```

110

111

### ProcessFunction Context Usage

112

113

```java

114

public static class CustomProcessFunction extends ProcessFunction<String, String> {

115

private final OutputTag<String> sideOutputTag = new OutputTag<String>("side") {};

116

117

@Override

118

public void processElement(String value, Context ctx, Collector<String> out) {

119

if (someCondition(value)) {

120

// Send to main output

121

out.collect("Main: " + value);

122

} else {

123

// Send to side output

124

ctx.output(sideOutputTag, "Side: " + value);

125

}

126

}

127

}

128

```

129

130

### Multiple Side Outputs

131

132

```java

133

// Define multiple output tags

134

static final OutputTag<String> errorTag = new OutputTag<String>("errors") {};

135

static final OutputTag<String> warningTag = new OutputTag<String>("warnings") {};

136

static final OutputTag<String> infoTag = new OutputTag<String>("info") {};

137

138

// Route to different side outputs based on log level

139

public void processElement(LogEntry value, Context ctx, Collector<LogEntry> out) {

140

switch (value.getLevel()) {

141

case ERROR:

142

ctx.output(errorTag, value.getMessage());

143

break;

144

case WARNING:

145

ctx.output(warningTag, value.getMessage());

146

break;

147

case INFO:

148

ctx.output(infoTag, value.getMessage());

149

break;

150

default:

151

out.collect(value);

152

}

153

}

154

155

// Extract and process each side output separately

156

DataStream<String> errors = mainStream.getSideOutput(errorTag);

157

DataStream<String> warnings = mainStream.getSideOutput(warningTag);

158

DataStream<String> info = mainStream.getSideOutput(infoTag);

159

```

160

161

## ProcessFunction API

162

163

### Core ProcessFunction

164

165

Base class for low-level stream processing with side output support.

166

167

```java { .api }

168

/**

169

* Base class for user-defined functions that process elements and have access to context and side outputs

170

* @param <I> Input element type

171

* @param <O> Output element type

172

*/

173

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

174

/**

175

* Process individual elements with access to context and collectors

176

* @param value Input element

177

* @param ctx Context providing access to side outputs, timers, and metadata

178

* @param out Main output collector

179

*/

180

public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

181

182

/**

183

* Called when a timer fires - for timer-based processing

184

* @param timestamp Timestamp when timer was set to fire

185

* @param ctx Context providing access to side outputs and timers

186

* @param out Main output collector

187

*/

188

public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;

189

}

190

```

191

192

### ProcessFunction Context

193

194

Context interface providing access to side outputs and processing metadata.

195

196

```java { .api }

197

/**

198

* Context interface for ProcessFunction providing side output and timer access

199

*/

200

public abstract class Context {

201

/**

202

* Emit data to a side output identified by OutputTag

203

* @param outputTag Output tag identifying the side output

204

* @param value Value to emit to side output

205

*/

206

public abstract <X> void output(OutputTag<X> outputTag, X value);

207

208

/**

209

* Get current processing timestamp

210

* @return Processing time timestamp

211

*/

212

public abstract long timestamp();

213

214

/**

215

* Get current watermark

216

* @return Current watermark timestamp

217

*/

218

public abstract long currentWatermark();

219

}

220

```

221

222

## Event Time and Windowing with Side Outputs

223

224

### Time-based Side Output Routing

225

226

```java

227

// Configure event time processing

228

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

229

230

// Process with time-based side output routing

231

DataStream<TimestampedEvent> processedStream = inputStream

232

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<TimestampedEvent>() {

233

@Override

234

public long extractAscendingTimestamp(TimestampedEvent element) {

235

return element.getTimestamp();

236

}

237

})

238

.process(new TimeBasedProcessor());

239

240

// Main stream: process recent events

241

DataStream<TimestampedEvent> recentEvents = processedStream

242

.keyBy(event -> event.getKey())

243

.timeWindow(Time.minutes(5))

244

.reduce(new EventReducer());

245

246

// Side output: handle late events

247

DataStream<TimestampedEvent> lateEvents = processedStream

248

.getSideOutput(lateEventsTag)

249

.map(event -> event.markAsLate());

250

```

251

252

### Window Processing with Side Outputs

253

254

```java

255

// Windowed aggregation with side output for processing metadata

256

DataStream<Tuple2<String, Integer>> windowedCounts = tokenized

257

.keyBy(0)

258

.timeWindow(Time.seconds(5))

259

.sum(1);

260

261

// Side output contains rejected words with timestamps

262

DataStream<String> rejectedWithTime = tokenized

263

.getSideOutput(rejectedWordsTag)

264

.map(new MapFunction<String, String>() {

265

@Override

266

public String map(String word) throws Exception {

267

return String.format("Rejected at %d: %s", System.currentTimeMillis(), word);

268

}

269

});

270

```

271

272

## Use Cases and Patterns

273

274

### Data Quality Filtering

275

- Route invalid records to side output for monitoring

276

- Process valid records in main stream

277

- Generate quality metrics from side output

278

279

### Alert Generation

280

- Main stream: normal processing

281

- Side output: critical events requiring immediate attention

282

- Separate processing pipelines for alerts vs normal data

283

284

### A/B Testing

285

- Route traffic to different processing paths

286

- Compare results from main stream vs side outputs

287

- Feature flag-based routing decisions

288

289

### Multi-tenant Processing

290

- Route data by tenant ID to different side outputs

291

- Tenant-specific processing and storage

292

- Resource isolation and monitoring per tenant

293

294

## Dependencies

295

296

```xml

297

<dependency>

298

<groupId>org.apache.flink</groupId>

299

<artifactId>flink-streaming-java_2.10</artifactId>

300

<version>1.3.3</version>

301

</dependency>

302

```

303

304

## Required Imports

305

306

```java

307

import org.apache.flink.streaming.api.functions.ProcessFunction;

308

import org.apache.flink.util.OutputTag;

309

import org.apache.flink.api.java.tuple.Tuple2;

310

import org.apache.flink.streaming.api.TimeCharacteristic;

311

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

312

import org.apache.flink.streaming.api.windowing.time.Time;

313

import org.apache.flink.util.Collector;

314

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

315

```