Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Streaming iterations with feedback loops and convergence criteria for iterative algorithms. Demonstrates iterative streams, output selectors, and split streams for complex iterative processing patterns.
Fibonacci number calculation using streaming iterations with feedback loops and convergence criteria.
/**
* Example illustrating iterations in Flink streaming
* Sums random numbers and counts additions to reach threshold iteratively
* @param args Command line arguments (--input path, --output path)
*/
public class IterateExample {
public static void main(String[] args) throws Exception;
}Usage Example:
# Run with default random data
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.iteration.IterateExample
# Run with file input
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.iteration.IterateExample \
--input /path/to/input.txt --output /path/to/results.txtMaps input tuples for iteration processing, preparing data for iterative computation.
/**
* Maps input pairs to iteration tuples with counter initialization
* Transforms Tuple2<Integer, Integer> to Tuple5 for iteration state
*/
public static class InputMap
implements MapFunction<Tuple2<Integer, Integer>,
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
/**
* Maps input tuple to iteration state tuple
* @param value Input pair (a, b)
* @return Tuple5(a, b, a, b, 0) - original values + working values + counter
*/
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
Tuple2<Integer, Integer> value) throws Exception;
}Iteration step function that calculates the next Fibonacci numbers and increments counter.
/**
* Iteration step function calculating next Fibonacci number
* Updates working values and increments iteration counter
*/
public static class Step
implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
/**
* Calculates next iteration step
* @param value Current state: (origA, origB, prevFib, currFib, counter)
* @return Next state: (origA, origB, currFib, nextFib, counter+1)
*/
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception;
}Output selector determining whether to continue iteration or produce final output.
/**
* OutputSelector determining iteration continuation or termination
* Routes tuples to 'iterate' or 'output' channels based on convergence
*/
public static class MySelector
implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
/**
* Selects output channel based on Fibonacci values and threshold
* @param value Current iteration state
* @return Collection containing "iterate" or "output" channel names
*/
public Iterable<String> select(
Tuple5<Integer, Integer, Integer, Integer, Integer> value);
}Maps iteration results to final output format for downstream processing.
/**
* Maps iteration results to final output format
* Extracts original input pair and final iteration counter
*/
public static class OutputMap
implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
Tuple2<Tuple2<Integer, Integer>, Integer>> {
/**
* Extracts final results from iteration state
* @param value Final iteration state
* @return Tuple2 containing original input pair and iteration count
*/
public Tuple2<Tuple2<Integer, Integer>, Integer> map(
Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception;
}// Create input stream of integer pairs
DataStream<Tuple2<Integer, Integer>> inputStream;
if (params.has("input")) {
inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap());
} else {
inputStream = env.addSource(new RandomFibonacciSource());
}
// Create iterative data stream with 5 second timeout
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it =
inputStream.map(new InputMap()).iterate(5000);
// Apply step function and split output
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step =
it.map(new Step()).split(new MySelector());
// Close iteration loop
it.closeWith(step.select("iterate"));
// Extract final results
DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers =
step.select("output").map(new OutputMap());// Set buffer timeout for low latency
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment()
.setBufferTimeout(1); // 1ms buffer timeout for continuous flushing// Input pairs for Fibonacci calculation
Tuple2<Integer, Integer> input = new Tuple2<>(first, second);// Iteration state: (originalA, originalB, prevValue, currentValue, counter)
Tuple5<Integer, Integer, Integer, Integer, Integer> state;
// f0, f1: Original input values (preserved)
// f2: Previous Fibonacci value
// f3: Current Fibonacci value
// f4: Iteration counter// Final output: ((originalA, originalB), iterationCount)
Tuple2<Tuple2<Integer, Integer>, Integer> result;private static final int BOUND = 100;
@Override
public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
List<String> output = new ArrayList<>();
if (value.f2 < BOUND && value.f3 < BOUND) {
output.add("iterate"); // Continue iteration
} else {
output.add("output"); // Produce final result
}
return output;
}@Override
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception {
// Calculate next Fibonacci: f(n+1) = f(n-1) + f(n)
return new Tuple5<>(
value.f0, // Original A (preserved)
value.f1, // Original B (preserved)
value.f3, // Previous = current
value.f2 + value.f3, // Current = prev + current (Fibonacci)
++value.f4 // Increment counter
);
}private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
private Random rnd = new Random();
private volatile boolean isRunning = true;
private int counter = 0;
private static final int BOUND = 100;
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (isRunning && counter < BOUND) {
int first = rnd.nextInt(BOUND / 2 - 1) + 1;
int second = rnd.nextInt(BOUND / 2 - 1) + 1;
ctx.collect(new Tuple2<>(first, second));
counter++;
Thread.sleep(50L);
}
}
@Override
public void cancel() {
isRunning = false;
}
}private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(String value) throws Exception {
// Parse "(a,b)" format
String record = value.substring(1, value.length() - 1);
String[] splitted = record.split(",");
return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
}
}Input → InputMap → IterativeStream → Step → Split
↑ ↓
└── iterate ←───────┘
↓
output → OutputMap → Results// Low latency configuration
env.setBufferTimeout(1); // 1ms timeout for immediate flushing<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10