0
# Iteration Operations
1
2
Support for iterative algorithms including bulk iterations and delta iterations. These operations enable implementation of machine learning algorithms, graph processing, and other iterative computations that require multiple passes over the data.
3
4
## Capabilities
5
6
### Bulk Iterations
7
8
Simple bulk iterations where the entire dataset is processed in each iteration until a maximum number of iterations is reached or a termination criterion is met.
9
10
```java { .api }
11
/**
12
* Create a bulk iteration
13
* @param maxIterations maximum number of iterations
14
* @return IterativeDataSet for iteration configuration
15
*/
16
public IterativeDataSet<T> iterate(int maxIterations);
17
```
18
19
### IterativeDataSet Operations
20
21
Operations available on iterative datasets for implementing bulk iteration logic.
22
23
```java { .api }
24
/**
25
* Close the iteration with the iteration result
26
* @param iterationResult the result of each iteration step
27
* @return DataSet with the final iteration result
28
*/
29
public DataSet<T> closeWith(DataSet<T> iterationResult);
30
31
/**
32
* Close the iteration with result and termination criterion
33
* @param iterationResult the result of each iteration step
34
* @param terminationCriterion dataset that determines when to terminate
35
* @return DataSet with the final iteration result
36
*/
37
public DataSet<T> closeWith(DataSet<T> iterationResult, DataSet<T> terminationCriterion);
38
```
39
40
**Usage Examples:**
41
42
```java
43
// Simple bulk iteration - compute powers of 2
44
DataSet<Integer> initial = env.fromElements(1);
45
46
DataSet<Integer> result = initial
47
.iterate(10) // max 10 iterations
48
.map(new MapFunction<Integer, Integer>() {
49
@Override
50
public Integer map(Integer value) {
51
return value * 2;
52
}
53
})
54
.closeWith(); // Close without termination criterion
55
56
// Bulk iteration with termination criterion
57
DataSet<Double> initialValues = env.fromElements(1.0, 2.0, 3.0);
58
59
IterativeDataSet<Double> iteration = initialValues.iterate(100);
60
61
DataSet<Double> nextValues = iteration
62
.map(new MapFunction<Double, Double>() {
63
@Override
64
public Double map(Double value) {
65
return Math.sqrt(value);
66
}
67
});
68
69
// Termination criterion: stop when values are close to 1.0
70
DataSet<Double> terminationCriterion = nextValues
71
.filter(new FilterFunction<Double>() {
72
@Override
73
public boolean filter(Double value) {
74
return Math.abs(value - 1.0) > 0.001;
75
}
76
});
77
78
DataSet<Double> finalResult = iteration.closeWith(nextValues, terminationCriterion);
79
```
80
81
### Delta Iterations
82
83
More efficient iterations for scenarios where only a small part of the data changes in each iteration. Delta iterations maintain a solution set (complete state) and a workset (elements to be processed).
84
85
```java { .api }
86
/**
87
* Create a delta iteration
88
* @param workset initial workset (elements to process)
89
* @param maxIterations maximum number of iterations
90
* @param keyFields key fields for joining solution set and workset
91
* @return DeltaIteration for delta iteration configuration
92
*/
93
public DeltaIteration<T, ?> iterateDelta(DataSet<?> workset, int maxIterations, int... keyFields);
94
95
/**
96
* Create a delta iteration with key selector
97
* @param workset initial workset (elements to process)
98
* @param maxIterations maximum number of iterations
99
* @param keyExtractor function to extract keys for joining
100
* @return DeltaIteration for delta iteration configuration
101
*/
102
public <K> DeltaIteration<T, ?> iterateDelta(DataSet<?> workset, int maxIterations, KeySelector<T, K> keyExtractor);
103
```
104
105
### DeltaIteration Operations
106
107
Operations for configuring delta iterations with solution set updates and workset generation.
108
109
```java { .api }
110
/**
111
* Delta iteration class for solution set and workset management
112
* @param <ST> solution set element type
113
* @param <WT> workset element type
114
*/
115
public class DeltaIteration<ST, WT> {
116
117
/**
118
* Get the solution set placeholder for join operations
119
* @return SolutionSetPlaceHolder representing the solution set
120
*/
121
public SolutionSetPlaceHolder<ST> getSolutionSet();
122
123
/**
124
* Get the workset placeholder for transformations
125
* @return WorksetPlaceHolder representing the workset
126
*/
127
public WorksetPlaceHolder<WT> getWorkset();
128
129
/**
130
* Close the delta iteration
131
* @param solutionSetDelta updates to the solution set
132
* @param newWorkset new workset for next iteration
133
* @return DataSet with final solution set
134
*/
135
public DataSet<ST> closeWith(DataSet<ST> solutionSetDelta, DataSet<WT> newWorkset);
136
}
137
```
138
139
**Usage Examples:**
140
141
```java
142
// Delta iteration example: Single Source Shortest Paths
143
DataSet<Tuple2<Long, Double>> vertices = env.fromElements(
144
new Tuple2<>(1L, 0.0), // source vertex with distance 0
145
new Tuple2<>(2L, Double.POSITIVE_INFINITY),
146
new Tuple2<>(3L, Double.POSITIVE_INFINITY)
147
);
148
149
DataSet<Tuple2<Long, Long>> edges = env.fromElements(
150
new Tuple2<>(1L, 2L), // edge from vertex 1 to vertex 2
151
new Tuple2<>(2L, 3L) // edge from vertex 2 to vertex 3
152
);
153
154
// Initial workset contains only the source vertex
155
DataSet<Tuple2<Long, Double>> initialWorkset = vertices
156
.filter(new FilterFunction<Tuple2<Long, Double>>() {
157
@Override
158
public boolean filter(Tuple2<Long, Double> vertex) {
159
return vertex.f1 == 0.0; // source vertex
160
}
161
});
162
163
// Create delta iteration
164
DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration =
165
vertices.iterateDelta(initialWorkset, 100, 0); // group by vertex ID (field 0)
166
167
// Get current solution set and workset placeholders
168
SolutionSetPlaceHolder<Tuple2<Long, Double>> solutionSet = iteration.getSolutionSet();
169
WorksetPlaceHolder<Tuple2<Long, Double>> workset = iteration.getWorkset();
170
171
// Join workset with edges to find neighbors
172
DataSet<Tuple3<Long, Long, Double>> candidateUpdates = workset
173
.join(edges)
174
.where(0).equalTo(0) // join on source vertex ID
175
.with(new JoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long>, Tuple3<Long, Long, Double>>() {
176
@Override
177
public Tuple3<Long, Long, Double> join(
178
Tuple2<Long, Double> vertex,
179
Tuple2<Long, Long> edge) {
180
return new Tuple3<>(edge.f1, vertex.f0, vertex.f1 + 1.0); // neighbor, source, new distance
181
}
182
});
183
184
// Generate solution set updates (shorter paths found)
185
DataSet<Tuple2<Long, Double>> solutionSetUpdates = candidateUpdates
186
.join(solutionSet)
187
.where(0).equalTo(0) // join on target vertex ID
188
.with(new JoinFunction<Tuple3<Long, Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>>() {
189
@Override
190
public Tuple2<Long, Double> join(
191
Tuple3<Long, Long, Double> candidate,
192
Tuple2<Long, Double> current) {
193
return candidate.f2 < current.f1 ?
194
new Tuple2<>(candidate.f0, candidate.f2) : null; // shorter path found
195
}
196
})
197
.filter(new FilterFunction<Tuple2<Long, Double>>() {
198
@Override
199
public boolean filter(Tuple2<Long, Double> value) {
200
return value != null;
201
}
202
});
203
204
// New workset contains vertices with updated distances
205
DataSet<Tuple2<Long, Double>> newWorkset = solutionSetUpdates;
206
207
// Close the iteration
208
DataSet<Tuple2<Long, Double>> shortestPaths = iteration.closeWith(solutionSetUpdates, newWorkset);
209
```
210
211
### DeltaIterationResultSet
212
213
Result container for delta iterations that provides access to both final solution set and iteration statistics.
214
215
```java { .api }
216
/**
217
* Result set from delta iteration containing solution set and metadata
218
* @param <ST> solution set element type
219
* @param <WT> workset element type
220
*/
221
public class DeltaIterationResultSet<ST, WT> {
222
223
/**
224
* Get the final solution set
225
* @return DataSet with final solution set
226
*/
227
public DataSet<ST> getFinalResult();
228
229
/**
230
* Get iteration statistics and metadata
231
* @return iteration execution information
232
*/
233
public IterationHead getIterationHead();
234
}
235
```
236
237
### Iteration Configuration
238
239
Additional configuration options for iterations.
240
241
```java { .api }
242
/**
243
* Set custom name for the iteration
244
* @param name name for the iteration
245
* @return configured iteration
246
*/
247
public IterativeDataSet<T> name(String name);
248
249
/**
250
* Set parallelism for the iteration
251
* @param parallelism parallelism level
252
* @return configured iteration
253
*/
254
public IterativeDataSet<T> parallelism(int parallelism);
255
```
256
257
### Termination Criteria
258
259
Methods for defining when iterations should terminate.
260
261
```java { .api }
262
/**
263
* Register aggregator for iteration
264
* @param name name of the aggregator
265
* @param aggregator the aggregator function
266
* @return configured iteration
267
*/
268
public <X> DeltaIteration<ST, WT> registerAggregator(String name, Aggregator<X> aggregator);
269
270
/**
271
* Register convergence criterion with aggregator
272
* @param name name of the convergence aggregator
273
* @param aggregator the aggregator function
274
* @param convergenceCheck convergence criterion implementation
275
* @return configured iteration
276
*/
277
public <X> DeltaIteration<ST, WT> registerAggregationConvergenceCriterion(
278
String name,
279
Aggregator<X> aggregator,
280
ConvergenceCriterion<X> convergenceCheck);
281
```
282
283
**Advanced Usage Examples:**
284
285
```java
286
// Iteration with broadcast variable and aggregator
287
DataSet<Double> parameters = env.fromElements(0.1, 0.2, 0.3);
288
289
DataSet<Double> result = initialData
290
.iterate(50)
291
.name("Gradient Descent")
292
.withBroadcastSet(parameters, "parameters")
293
.registerAggregator("convergence", new DoubleSumAggregator())
294
.map(new RichMapFunction<Double, Double>() {
295
private List<Double> params;
296
private DoubleSumAggregator convergenceAgg;
297
298
@Override
299
public void open(Configuration config) {
300
params = getRuntimeContext().getBroadcastVariable("parameters");
301
convergenceAgg = getIterationRuntimeContext().getIterationAggregator("convergence");
302
}
303
304
@Override
305
public Double map(Double value) {
306
double newValue = value * params.get(0); // use broadcast parameter
307
convergenceAgg.aggregate(Math.abs(newValue - value)); // track convergence
308
return newValue;
309
}
310
})
311
.closeWith();
312
```
313
314
## Types
315
316
```java { .api }
317
import org.apache.flink.api.java.operators.IterativeDataSet;
318
import org.apache.flink.api.java.operators.DeltaIteration;
319
import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
320
import org.apache.flink.api.java.operators.DeltaIteration.WorksetPlaceHolder;
321
import org.apache.flink.api.java.operators.DeltaIterationResultSet;
322
import org.apache.flink.api.common.aggregators.Aggregator;
323
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
324
import org.apache.flink.api.common.functions.RichMapFunction;
325
import org.apache.flink.configuration.Configuration;
326
import org.apache.flink.util.Collector;
327
```