CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

iteration.mddocs/

Streaming Iterations

Streaming iterations with feedback loops and convergence criteria for iterative algorithms. Demonstrates iterative streams, output selectors, and split streams for complex iterative processing patterns.

Capabilities

IterateExample

Fibonacci number calculation using streaming iterations with feedback loops and convergence criteria.

/**
 * Example illustrating iterations in Flink streaming
 * Sums random numbers and counts additions to reach threshold iteratively
 * @param args Command line arguments (--input path, --output path)
 */
public class IterateExample {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run with default random data
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.iteration.IterateExample

# Run with file input
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.iteration.IterateExample \
  --input /path/to/input.txt --output /path/to/results.txt

Core Components

InputMap

Maps input tuples for iteration processing, preparing data for iterative computation.

/**
 * Maps input pairs to iteration tuples with counter initialization
 * Transforms Tuple2<Integer, Integer> to Tuple5 for iteration state
 */
public static class InputMap 
    implements MapFunction<Tuple2<Integer, Integer>, 
                          Tuple5<Integer, Integer, Integer, Integer, Integer>> {
    
    /**
     * Maps input tuple to iteration state tuple
     * @param value Input pair (a, b)
     * @return Tuple5(a, b, a, b, 0) - original values + working values + counter
     */
    public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
        Tuple2<Integer, Integer> value) throws Exception;
}

Step

Iteration step function that calculates the next Fibonacci numbers and increments counter.

/**
 * Iteration step function calculating next Fibonacci number
 * Updates working values and increments iteration counter
 */
public static class Step 
    implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, 
                          Tuple5<Integer, Integer, Integer, Integer, Integer>> {
    
    /**
     * Calculates next iteration step
     * @param value Current state: (origA, origB, prevFib, currFib, counter)
     * @return Next state: (origA, origB, currFib, nextFib, counter+1)
     */
    public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
        Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception;
}

MySelector

Output selector determining whether to continue iteration or produce final output.

/**
 * OutputSelector determining iteration continuation or termination
 * Routes tuples to 'iterate' or 'output' channels based on convergence
 */
public static class MySelector 
    implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
    
    /**
     * Selects output channel based on Fibonacci values and threshold
     * @param value Current iteration state
     * @return Collection containing "iterate" or "output" channel names
     */
    public Iterable<String> select(
        Tuple5<Integer, Integer, Integer, Integer, Integer> value);
}

OutputMap

Maps iteration results to final output format for downstream processing.

/**
 * Maps iteration results to final output format
 * Extracts original input pair and final iteration counter
 */
public static class OutputMap 
    implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, 
                          Tuple2<Tuple2<Integer, Integer>, Integer>> {
    
    /**
     * Extracts final results from iteration state
     * @param value Final iteration state
     * @return Tuple2 containing original input pair and iteration count
     */
    public Tuple2<Tuple2<Integer, Integer>, Integer> map(
        Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception;
}

Iteration Pipeline

Complete Iteration Setup

// Create input stream of integer pairs
DataStream<Tuple2<Integer, Integer>> inputStream;
if (params.has("input")) {
    inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap());
} else {
    inputStream = env.addSource(new RandomFibonacciSource());
}

// Create iterative data stream with 5 second timeout
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = 
    inputStream.map(new InputMap()).iterate(5000);

// Apply step function and split output
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = 
    it.map(new Step()).split(new MySelector());

// Close iteration loop
it.closeWith(step.select("iterate"));

// Extract final results
DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = 
    step.select("output").map(new OutputMap());

Buffer Timeout Configuration

// Set buffer timeout for low latency
StreamExecutionEnvironment env = StreamExecutionEnvironment
    .getExecutionEnvironment()
    .setBufferTimeout(1); // 1ms buffer timeout for continuous flushing

Data Flow Pattern

Input Data Structure

// Input pairs for Fibonacci calculation
Tuple2<Integer, Integer> input = new Tuple2<>(first, second);

Iteration State Structure

// Iteration state: (originalA, originalB, prevValue, currentValue, counter)
Tuple5<Integer, Integer, Integer, Integer, Integer> state;
// f0, f1: Original input values (preserved)
// f2: Previous Fibonacci value  
// f3: Current Fibonacci value
// f4: Iteration counter

Output Structure

// Final output: ((originalA, originalB), iterationCount)
Tuple2<Tuple2<Integer, Integer>, Integer> result;

Convergence Logic

Threshold-Based Convergence

private static final int BOUND = 100;

@Override
public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
    List<String> output = new ArrayList<>();
    if (value.f2 < BOUND && value.f3 < BOUND) {
        output.add("iterate"); // Continue iteration
    } else {
        output.add("output");  // Produce final result
    }
    return output;
}

Step Calculation

@Override
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
    Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception {
    // Calculate next Fibonacci: f(n+1) = f(n-1) + f(n)
    return new Tuple5<>(
        value.f0,              // Original A (preserved)
        value.f1,              // Original B (preserved) 
        value.f3,              // Previous = current
        value.f2 + value.f3,   // Current = prev + current (Fibonacci)
        ++value.f4             // Increment counter
    );
}

Data Source Patterns

Random Fibonacci Source

private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
    private Random rnd = new Random();
    private volatile boolean isRunning = true;
    private int counter = 0;
    private static final int BOUND = 100;
    
    @Override
    public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
        while (isRunning && counter < BOUND) {
            int first = rnd.nextInt(BOUND / 2 - 1) + 1;
            int second = rnd.nextInt(BOUND / 2 - 1) + 1;
            ctx.collect(new Tuple2<>(first, second));
            counter++;
            Thread.sleep(50L);
        }
    }
    
    @Override
    public void cancel() {
        isRunning = false;
    }
}

File Input Mapping

private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {
    @Override
    public Tuple2<Integer, Integer> map(String value) throws Exception {
        // Parse "(a,b)" format
        String record = value.substring(1, value.length() - 1);
        String[] splitted = record.split(",");
        return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
    }
}

Key Concepts

Iterative Streams

  • Iteration Timeout: Maximum time for iteration loop (5000ms in example)
  • Feedback Loop: Results from step function fed back to iteration input
  • State Preservation: Original values maintained throughout iteration
  • Counter Tracking: Iteration count tracked for analysis

Split Streams

  • Channel Selection: Route data to different processing paths
  • Output Selector: Logic determining which channel(s) to use
  • Multiple Outputs: Single element can go to multiple channels

Stream Topology

Input → InputMap → IterativeStream → Step → Split
                         ↑                    ↓
                         └── iterate ←───────┘
                                        ↓
                                     output → OutputMap → Results

Performance Considerations

Buffer Timeout

// Low latency configuration
env.setBufferTimeout(1); // 1ms timeout for immediate flushing

Parallelism

  • Iteration operations typically single-threaded per key
  • Parallelism achieved through key-based partitioning
  • Buffer timeout affects latency vs throughput trade-off

Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Required Imports

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json