0
# Streaming Iterations
1
2
Streaming iterations with feedback loops and convergence criteria for iterative algorithms. Demonstrates iterative streams, output selectors, and split streams for complex iterative processing patterns.
3
4
## Capabilities
5
6
### IterateExample
7
8
Fibonacci number calculation using streaming iterations with feedback loops and convergence criteria.
9
10
```java { .api }
11
/**
12
* Example illustrating iterations in Flink streaming
13
* Sums random numbers and counts additions to reach threshold iteratively
14
* @param args Command line arguments (--input path, --output path)
15
*/
16
public class IterateExample {
17
public static void main(String[] args) throws Exception;
18
}
19
```
20
21
**Usage Example:**
22
23
```bash
24
# Run with default random data
25
java -cp flink-examples-streaming_2.10-1.3.3.jar \
26
org.apache.flink.streaming.examples.iteration.IterateExample
27
28
# Run with file input
29
java -cp flink-examples-streaming_2.10-1.3.3.jar \
30
org.apache.flink.streaming.examples.iteration.IterateExample \
31
--input /path/to/input.txt --output /path/to/results.txt
32
```
33
34
## Core Components
35
36
### InputMap
37
38
Maps input tuples for iteration processing, preparing data for iterative computation.
39
40
```java { .api }
41
/**
42
* Maps input pairs to iteration tuples with counter initialization
43
* Transforms Tuple2<Integer, Integer> to Tuple5 for iteration state
44
*/
45
public static class InputMap
46
implements MapFunction<Tuple2<Integer, Integer>,
47
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
48
49
/**
50
* Maps input tuple to iteration state tuple
51
* @param value Input pair (a, b)
52
* @return Tuple5(a, b, a, b, 0) - original values + working values + counter
53
*/
54
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
55
Tuple2<Integer, Integer> value) throws Exception;
56
}
57
```
58
59
### Step
60
61
Iteration step function that calculates the next Fibonacci numbers and increments counter.
62
63
```java { .api }
64
/**
65
* Iteration step function calculating next Fibonacci number
66
* Updates working values and increments iteration counter
67
*/
68
public static class Step
69
implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
70
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
71
72
/**
73
* Calculates next iteration step
74
* @param value Current state: (origA, origB, prevFib, currFib, counter)
75
* @return Next state: (origA, origB, currFib, nextFib, counter+1)
76
*/
77
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
78
Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception;
79
}
80
```
81
82
### MySelector
83
84
Output selector determining whether to continue iteration or produce final output.
85
86
```java { .api }
87
/**
88
* OutputSelector determining iteration continuation or termination
89
* Routes tuples to 'iterate' or 'output' channels based on convergence
90
*/
91
public static class MySelector
92
implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
93
94
/**
95
* Selects output channel based on Fibonacci values and threshold
96
* @param value Current iteration state
97
* @return Collection containing "iterate" or "output" channel names
98
*/
99
public Iterable<String> select(
100
Tuple5<Integer, Integer, Integer, Integer, Integer> value);
101
}
102
```
103
104
### OutputMap
105
106
Maps iteration results to final output format for downstream processing.
107
108
```java { .api }
109
/**
110
* Maps iteration results to final output format
111
* Extracts original input pair and final iteration counter
112
*/
113
public static class OutputMap
114
implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
115
Tuple2<Tuple2<Integer, Integer>, Integer>> {
116
117
/**
118
* Extracts final results from iteration state
119
* @param value Final iteration state
120
* @return Tuple2 containing original input pair and iteration count
121
*/
122
public Tuple2<Tuple2<Integer, Integer>, Integer> map(
123
Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception;
124
}
125
```
126
127
## Iteration Pipeline
128
129
### Complete Iteration Setup
130
131
```java
132
// Create input stream of integer pairs
133
DataStream<Tuple2<Integer, Integer>> inputStream;
134
if (params.has("input")) {
135
inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap());
136
} else {
137
inputStream = env.addSource(new RandomFibonacciSource());
138
}
139
140
// Create iterative data stream with 5 second timeout
141
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it =
142
inputStream.map(new InputMap()).iterate(5000);
143
144
// Apply step function and split output
145
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step =
146
it.map(new Step()).split(new MySelector());
147
148
// Close iteration loop
149
it.closeWith(step.select("iterate"));
150
151
// Extract final results
152
DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers =
153
step.select("output").map(new OutputMap());
154
```
155
156
### Buffer Timeout Configuration
157
158
```java
159
// Set buffer timeout for low latency
160
StreamExecutionEnvironment env = StreamExecutionEnvironment
161
.getExecutionEnvironment()
162
.setBufferTimeout(1); // 1ms buffer timeout for continuous flushing
163
```
164
165
## Data Flow Pattern
166
167
### Input Data Structure
168
```java
169
// Input pairs for Fibonacci calculation
170
Tuple2<Integer, Integer> input = new Tuple2<>(first, second);
171
```
172
173
### Iteration State Structure
174
```java
175
// Iteration state: (originalA, originalB, prevValue, currentValue, counter)
176
Tuple5<Integer, Integer, Integer, Integer, Integer> state;
177
// f0, f1: Original input values (preserved)
178
// f2: Previous Fibonacci value
179
// f3: Current Fibonacci value
180
// f4: Iteration counter
181
```
182
183
### Output Structure
184
```java
185
// Final output: ((originalA, originalB), iterationCount)
186
Tuple2<Tuple2<Integer, Integer>, Integer> result;
187
```
188
189
## Convergence Logic
190
191
### Threshold-Based Convergence
192
```java
193
private static final int BOUND = 100;
194
195
@Override
196
public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
197
List<String> output = new ArrayList<>();
198
if (value.f2 < BOUND && value.f3 < BOUND) {
199
output.add("iterate"); // Continue iteration
200
} else {
201
output.add("output"); // Produce final result
202
}
203
return output;
204
}
205
```
206
207
### Step Calculation
208
```java
209
@Override
210
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
211
Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception {
212
// Calculate next Fibonacci: f(n+1) = f(n-1) + f(n)
213
return new Tuple5<>(
214
value.f0, // Original A (preserved)
215
value.f1, // Original B (preserved)
216
value.f3, // Previous = current
217
value.f2 + value.f3, // Current = prev + current (Fibonacci)
218
++value.f4 // Increment counter
219
);
220
}
221
```
222
223
## Data Source Patterns
224
225
### Random Fibonacci Source
226
227
```java
228
private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
229
private Random rnd = new Random();
230
private volatile boolean isRunning = true;
231
private int counter = 0;
232
private static final int BOUND = 100;
233
234
@Override
235
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
236
while (isRunning && counter < BOUND) {
237
int first = rnd.nextInt(BOUND / 2 - 1) + 1;
238
int second = rnd.nextInt(BOUND / 2 - 1) + 1;
239
ctx.collect(new Tuple2<>(first, second));
240
counter++;
241
Thread.sleep(50L);
242
}
243
}
244
245
@Override
246
public void cancel() {
247
isRunning = false;
248
}
249
}
250
```
251
252
### File Input Mapping
253
254
```java
255
private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {
256
@Override
257
public Tuple2<Integer, Integer> map(String value) throws Exception {
258
// Parse "(a,b)" format
259
String record = value.substring(1, value.length() - 1);
260
String[] splitted = record.split(",");
261
return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
262
}
263
}
264
```
265
266
## Key Concepts
267
268
### Iterative Streams
269
- **Iteration Timeout**: Maximum time for iteration loop (5000ms in example)
270
- **Feedback Loop**: Results from step function fed back to iteration input
271
- **State Preservation**: Original values maintained throughout iteration
272
- **Counter Tracking**: Iteration count tracked for analysis
273
274
### Split Streams
275
- **Channel Selection**: Route data to different processing paths
276
- **Output Selector**: Logic determining which channel(s) to use
277
- **Multiple Outputs**: Single element can go to multiple channels
278
279
### Stream Topology
280
```
281
Input → InputMap → IterativeStream → Step → Split
282
↑ ↓
283
└── iterate ←───────┘
284
↓
285
output → OutputMap → Results
286
```
287
288
## Performance Considerations
289
290
### Buffer Timeout
291
```java
292
// Low latency configuration
293
env.setBufferTimeout(1); // 1ms timeout for immediate flushing
294
```
295
296
### Parallelism
297
- Iteration operations typically single-threaded per key
298
- Parallelism achieved through key-based partitioning
299
- Buffer timeout affects latency vs throughput trade-off
300
301
## Dependencies
302
303
```xml
304
<dependency>
305
<groupId>org.apache.flink</groupId>
306
<artifactId>flink-streaming-java_2.10</artifactId>
307
<version>1.3.3</version>
308
</dependency>
309
```
310
311
## Required Imports
312
313
```java
314
import org.apache.flink.api.common.functions.MapFunction;
315
import org.apache.flink.api.java.tuple.Tuple2;
316
import org.apache.flink.api.java.tuple.Tuple5;
317
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
318
import org.apache.flink.streaming.api.datastream.DataStream;
319
import org.apache.flink.streaming.api.datastream.IterativeStream;
320
import org.apache.flink.streaming.api.datastream.SplitStream;
321
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
322
import org.apache.flink.streaming.api.functions.source.SourceFunction;
323
```