or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdexternal-systems.mdindex.mditeration.mdjoins.mdmachine-learning.mdside-output.mdsocket.mdutilities.mdwindowing.mdwordcount.md

iteration.mddocs/

0

# Streaming Iterations

1

2

Streaming iterations with feedback loops and convergence criteria for iterative algorithms. Demonstrates iterative streams, output selectors, and split streams for complex iterative processing patterns.

3

4

## Capabilities

5

6

### IterateExample

7

8

Fibonacci number calculation using streaming iterations with feedback loops and convergence criteria.

9

10

```java { .api }

11

/**

12

* Example illustrating iterations in Flink streaming

13

* Sums random numbers and counts additions to reach threshold iteratively

14

* @param args Command line arguments (--input path, --output path)

15

*/

16

public class IterateExample {

17

public static void main(String[] args) throws Exception;

18

}

19

```

20

21

**Usage Example:**

22

23

```bash

24

# Run with default random data

25

java -cp flink-examples-streaming_2.10-1.3.3.jar \

26

org.apache.flink.streaming.examples.iteration.IterateExample

27

28

# Run with file input

29

java -cp flink-examples-streaming_2.10-1.3.3.jar \

30

org.apache.flink.streaming.examples.iteration.IterateExample \

31

--input /path/to/input.txt --output /path/to/results.txt

32

```

33

34

## Core Components

35

36

### InputMap

37

38

Maps input tuples for iteration processing, preparing data for iterative computation.

39

40

```java { .api }

41

/**

42

* Maps input pairs to iteration tuples with counter initialization

43

* Transforms Tuple2<Integer, Integer> to Tuple5 for iteration state

44

*/

45

public static class InputMap

46

implements MapFunction<Tuple2<Integer, Integer>,

47

Tuple5<Integer, Integer, Integer, Integer, Integer>> {

48

49

/**

50

* Maps input tuple to iteration state tuple

51

* @param value Input pair (a, b)

52

* @return Tuple5(a, b, a, b, 0) - original values + working values + counter

53

*/

54

public Tuple5<Integer, Integer, Integer, Integer, Integer> map(

55

Tuple2<Integer, Integer> value) throws Exception;

56

}

57

```

58

59

### Step

60

61

Iteration step function that calculates the next Fibonacci numbers and increments counter.

62

63

```java { .api }

64

/**

65

* Iteration step function calculating next Fibonacci number

66

* Updates working values and increments iteration counter

67

*/

68

public static class Step

69

implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,

70

Tuple5<Integer, Integer, Integer, Integer, Integer>> {

71

72

/**

73

* Calculates next iteration step

74

* @param value Current state: (origA, origB, prevFib, currFib, counter)

75

* @return Next state: (origA, origB, currFib, nextFib, counter+1)

76

*/

77

public Tuple5<Integer, Integer, Integer, Integer, Integer> map(

78

Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception;

79

}

80

```

81

82

### MySelector

83

84

Output selector determining whether to continue iteration or produce final output.

85

86

```java { .api }

87

/**

88

* OutputSelector determining iteration continuation or termination

89

* Routes tuples to 'iterate' or 'output' channels based on convergence

90

*/

91

public static class MySelector

92

implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {

93

94

/**

95

* Selects output channel based on Fibonacci values and threshold

96

* @param value Current iteration state

97

* @return Collection containing "iterate" or "output" channel names

98

*/

99

public Iterable<String> select(

100

Tuple5<Integer, Integer, Integer, Integer, Integer> value);

101

}

102

```

103

104

### OutputMap

105

106

Maps iteration results to final output format for downstream processing.

107

108

```java { .api }

109

/**

110

* Maps iteration results to final output format

111

* Extracts original input pair and final iteration counter

112

*/

113

public static class OutputMap

114

implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,

115

Tuple2<Tuple2<Integer, Integer>, Integer>> {

116

117

/**

118

* Extracts final results from iteration state

119

* @param value Final iteration state

120

* @return Tuple2 containing original input pair and iteration count

121

*/

122

public Tuple2<Tuple2<Integer, Integer>, Integer> map(

123

Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception;

124

}

125

```

126

127

## Iteration Pipeline

128

129

### Complete Iteration Setup

130

131

```java

132

// Create input stream of integer pairs

133

DataStream<Tuple2<Integer, Integer>> inputStream;

134

if (params.has("input")) {

135

inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap());

136

} else {

137

inputStream = env.addSource(new RandomFibonacciSource());

138

}

139

140

// Create iterative data stream with 5 second timeout

141

IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it =

142

inputStream.map(new InputMap()).iterate(5000);

143

144

// Apply step function and split output

145

SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step =

146

it.map(new Step()).split(new MySelector());

147

148

// Close iteration loop

149

it.closeWith(step.select("iterate"));

150

151

// Extract final results

152

DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers =

153

step.select("output").map(new OutputMap());

154

```

155

156

### Buffer Timeout Configuration

157

158

```java

159

// Set buffer timeout for low latency

160

StreamExecutionEnvironment env = StreamExecutionEnvironment

161

.getExecutionEnvironment()

162

.setBufferTimeout(1); // 1ms buffer timeout for continuous flushing

163

```

164

165

## Data Flow Pattern

166

167

### Input Data Structure

168

```java

169

// Input pairs for Fibonacci calculation

170

Tuple2<Integer, Integer> input = new Tuple2<>(first, second);

171

```

172

173

### Iteration State Structure

174

```java

175

// Iteration state: (originalA, originalB, prevValue, currentValue, counter)

176

Tuple5<Integer, Integer, Integer, Integer, Integer> state;

177

// f0, f1: Original input values (preserved)

178

// f2: Previous Fibonacci value

179

// f3: Current Fibonacci value

180

// f4: Iteration counter

181

```

182

183

### Output Structure

184

```java

185

// Final output: ((originalA, originalB), iterationCount)

186

Tuple2<Tuple2<Integer, Integer>, Integer> result;

187

```

188

189

## Convergence Logic

190

191

### Threshold-Based Convergence

192

```java

193

private static final int BOUND = 100;

194

195

@Override

196

public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {

197

List<String> output = new ArrayList<>();

198

if (value.f2 < BOUND && value.f3 < BOUND) {

199

output.add("iterate"); // Continue iteration

200

} else {

201

output.add("output"); // Produce final result

202

}

203

return output;

204

}

205

```

206

207

### Step Calculation

208

```java

209

@Override

210

public Tuple5<Integer, Integer, Integer, Integer, Integer> map(

211

Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception {

212

// Calculate next Fibonacci: f(n+1) = f(n-1) + f(n)

213

return new Tuple5<>(

214

value.f0, // Original A (preserved)

215

value.f1, // Original B (preserved)

216

value.f3, // Previous = current

217

value.f2 + value.f3, // Current = prev + current (Fibonacci)

218

++value.f4 // Increment counter

219

);

220

}

221

```

222

223

## Data Source Patterns

224

225

### Random Fibonacci Source

226

227

```java

228

private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {

229

private Random rnd = new Random();

230

private volatile boolean isRunning = true;

231

private int counter = 0;

232

private static final int BOUND = 100;

233

234

@Override

235

public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {

236

while (isRunning && counter < BOUND) {

237

int first = rnd.nextInt(BOUND / 2 - 1) + 1;

238

int second = rnd.nextInt(BOUND / 2 - 1) + 1;

239

ctx.collect(new Tuple2<>(first, second));

240

counter++;

241

Thread.sleep(50L);

242

}

243

}

244

245

@Override

246

public void cancel() {

247

isRunning = false;

248

}

249

}

250

```

251

252

### File Input Mapping

253

254

```java

255

private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {

256

@Override

257

public Tuple2<Integer, Integer> map(String value) throws Exception {

258

// Parse "(a,b)" format

259

String record = value.substring(1, value.length() - 1);

260

String[] splitted = record.split(",");

261

return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));

262

}

263

}

264

```

265

266

## Key Concepts

267

268

### Iterative Streams

269

- **Iteration Timeout**: Maximum time for iteration loop (5000ms in example)

270

- **Feedback Loop**: Results from step function fed back to iteration input

271

- **State Preservation**: Original values maintained throughout iteration

272

- **Counter Tracking**: Iteration count tracked for analysis

273

274

### Split Streams

275

- **Channel Selection**: Route data to different processing paths

276

- **Output Selector**: Logic determining which channel(s) to use

277

- **Multiple Outputs**: Single element can go to multiple channels

278

279

### Stream Topology

280

```

281

Input → InputMap → IterativeStream → Step → Split

282

↑ ↓

283

└── iterate ←───────┘

284

285

output → OutputMap → Results

286

```

287

288

## Performance Considerations

289

290

### Buffer Timeout

291

```java

292

// Low latency configuration

293

env.setBufferTimeout(1); // 1ms timeout for immediate flushing

294

```

295

296

### Parallelism

297

- Iteration operations typically single-threaded per key

298

- Parallelism achieved through key-based partitioning

299

- Buffer timeout affects latency vs throughput trade-off

300

301

## Dependencies

302

303

```xml

304

<dependency>

305

<groupId>org.apache.flink</groupId>

306

<artifactId>flink-streaming-java_2.10</artifactId>

307

<version>1.3.3</version>

308

</dependency>

309

```

310

311

## Required Imports

312

313

```java

314

import org.apache.flink.api.common.functions.MapFunction;

315

import org.apache.flink.api.java.tuple.Tuple2;

316

import org.apache.flink.api.java.tuple.Tuple5;

317

import org.apache.flink.streaming.api.collector.selector.OutputSelector;

318

import org.apache.flink.streaming.api.datastream.DataStream;

319

import org.apache.flink.streaming.api.datastream.IterativeStream;

320

import org.apache.flink.streaming.api.datastream.SplitStream;

321

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

322

import org.apache.flink.streaming.api.functions.source.SourceFunction;

323

```