CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

side-output.mddocs/

Side Output Examples

Advanced stream processing patterns using side outputs to split streams based on conditions. Side outputs allow a single operator to emit data to multiple output streams without complex branching logic.

Capabilities

SideOutputExample

Demonstrates splitting a stream using side outputs with conditional logic in a ProcessFunction.

/**
 * Stream processing with side outputs for conditional data routing
 * Splits word count stream based on word length filtering
 * @param args Command line arguments (--input, --output, --rejected-words-output)
 */
public class SideOutputExample {
    static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run with file input/output for both main and side streams
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.sideoutput.SideOutputExample \
  --input /path/to/input.txt \
  --output /path/to/main-output.txt \
  --rejected-words-output /path/to/rejected-words.txt

# Run with default data (prints both streams to stdout)
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.sideoutput.SideOutputExample

OutputTag Definition

Type-safe output tag for routing data to side outputs.

/**
 * Output tag for routing data to named side outputs
 * @param <T> Type of elements sent to this side output
 */
class OutputTag<T> {
    public OutputTag(String id);
}

Usage Pattern:

// Define output tag for rejected words
static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};

Tokenizer ProcessFunction

ProcessFunction that splits text and routes long words to side output.

/**
 * ProcessFunction for splitting text with conditional side output routing
 * Routes words longer than 5 characters to side output, others to main stream
 */
public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
    /**
     * Process input text and conditionally route to main or side output
     * @param value Input text line
     * @param ctx ProcessFunction context for side output access
     * @param out Main output collector for word-count pairs
     */
    public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) 
        throws Exception;
}

The tokenizer implementation:

  • Splits input text on non-word characters using regex \\W+
  • Normalizes words to lowercase
  • Words > 5 characters: sent to side output using ctx.output(rejectedWordsTag, word)
  • Words ≤ 5 characters: sent to main stream as Tuple2<String, Integer>(word, 1)
  • Filters out empty tokens

Side Output Processing Patterns

Stream Splitting and Processing

// Main processing pipeline with side output extraction
SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text
    .process(new Tokenizer());

// Extract side output stream for rejected words
DataStream<String> rejectedWords = tokenized
    .getSideOutput(rejectedWordsTag)
    .map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws Exception {
            return "Rejected: " + value;
        }
    });

// Main stream continues with windowing and aggregation
DataStream<Tuple2<String, Integer>> counts = tokenized
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1);

ProcessFunction Context Usage

public static class CustomProcessFunction extends ProcessFunction<String, String> {
    private final OutputTag<String> sideOutputTag = new OutputTag<String>("side") {};
    
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        if (someCondition(value)) {
            // Send to main output
            out.collect("Main: " + value);
        } else {
            // Send to side output
            ctx.output(sideOutputTag, "Side: " + value);
        }
    }
}

Multiple Side Outputs

// Define multiple output tags
static final OutputTag<String> errorTag = new OutputTag<String>("errors") {};
static final OutputTag<String> warningTag = new OutputTag<String>("warnings") {};
static final OutputTag<String> infoTag = new OutputTag<String>("info") {};

// Route to different side outputs based on log level
public void processElement(LogEntry value, Context ctx, Collector<LogEntry> out) {
    switch (value.getLevel()) {
        case ERROR:
            ctx.output(errorTag, value.getMessage());
            break;
        case WARNING:
            ctx.output(warningTag, value.getMessage());
            break;
        case INFO:
            ctx.output(infoTag, value.getMessage());
            break;
        default:
            out.collect(value);
    }
}

// Extract and process each side output separately
DataStream<String> errors = mainStream.getSideOutput(errorTag);
DataStream<String> warnings = mainStream.getSideOutput(warningTag);
DataStream<String> info = mainStream.getSideOutput(infoTag);

ProcessFunction API

Core ProcessFunction

Base class for low-level stream processing with side output support.

/**
 * Base class for user-defined functions that process elements and have access to context and side outputs
 * @param <I> Input element type
 * @param <O> Output element type
 */
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
    /**
     * Process individual elements with access to context and collectors
     * @param value Input element
     * @param ctx Context providing access to side outputs, timers, and metadata
     * @param out Main output collector
     */
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    
    /**
     * Called when a timer fires - for timer-based processing
     * @param timestamp Timestamp when timer was set to fire
     * @param ctx Context providing access to side outputs and timers  
     * @param out Main output collector
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
}

ProcessFunction Context

Context interface providing access to side outputs and processing metadata.

/**
 * Context interface for ProcessFunction providing side output and timer access
 */
public abstract class Context {
    /**
     * Emit data to a side output identified by OutputTag
     * @param outputTag Output tag identifying the side output
     * @param value Value to emit to side output
     */
    public abstract <X> void output(OutputTag<X> outputTag, X value);
    
    /**
     * Get current processing timestamp
     * @return Processing time timestamp
     */
    public abstract long timestamp();
    
    /**
     * Get current watermark
     * @return Current watermark timestamp
     */
    public abstract long currentWatermark();
}

Event Time and Windowing with Side Outputs

Time-based Side Output Routing

// Configure event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Process with time-based side output routing
DataStream<TimestampedEvent> processedStream = inputStream
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<TimestampedEvent>() {
        @Override
        public long extractAscendingTimestamp(TimestampedEvent element) {
            return element.getTimestamp();
        }
    })
    .process(new TimeBasedProcessor());

// Main stream: process recent events
DataStream<TimestampedEvent> recentEvents = processedStream
    .keyBy(event -> event.getKey())
    .timeWindow(Time.minutes(5))
    .reduce(new EventReducer());

// Side output: handle late events
DataStream<TimestampedEvent> lateEvents = processedStream
    .getSideOutput(lateEventsTag)
    .map(event -> event.markAsLate());

Window Processing with Side Outputs

// Windowed aggregation with side output for processing metadata
DataStream<Tuple2<String, Integer>> windowedCounts = tokenized
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1);

// Side output contains rejected words with timestamps
DataStream<String> rejectedWithTime = tokenized
    .getSideOutput(rejectedWordsTag)
    .map(new MapFunction<String, String>() {
        @Override
        public String map(String word) throws Exception {
            return String.format("Rejected at %d: %s", System.currentTimeMillis(), word);
        }
    });

Use Cases and Patterns

Data Quality Filtering

  • Route invalid records to side output for monitoring
  • Process valid records in main stream
  • Generate quality metrics from side output

Alert Generation

  • Main stream: normal processing
  • Side output: critical events requiring immediate attention
  • Separate processing pipelines for alerts vs normal data

A/B Testing

  • Route traffic to different processing paths
  • Compare results from main stream vs side outputs
  • Feature flag-based routing decisions

Multi-tenant Processing

  • Route data by tenant ID to different side outputs
  • Tenant-specific processing and storage
  • Resource isolation and monitoring per tenant

Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Required Imports

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json