0
# Iterations
1
2
Apache Flink Scala API supports iterative algorithms through bulk iteration and delta iteration patterns, enabling efficient implementation of graph algorithms, machine learning algorithms, and other iterative computations.
3
4
## Bulk Iteration
5
6
Bulk iteration repeatedly applies a transformation function to the entire dataset until a maximum number of iterations is reached or a termination condition is met.
7
8
### Basic Bulk Iteration
9
10
```scala { .api }
11
class DataSet[T] {
12
// Simple iteration with fixed number of iterations
13
def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
14
}
15
```
16
17
### Bulk Iteration with Termination
18
19
```scala { .api }
20
class DataSet[T] {
21
// Iteration with termination condition
22
def iterateWithTermination(maxIterations: Int)(
23
stepFunction: DataSet[T] => (DataSet[T], DataSet[_])
24
): DataSet[T]
25
}
26
```
27
28
## Delta Iteration
29
30
Delta iteration is optimized for scenarios where only a small portion of the data changes in each iteration. It maintains a solution set (complete state) and a workset (elements that may trigger updates).
31
32
```scala { .api }
33
class DataSet[T] {
34
// Delta iteration with string-based key fields
35
def iterateDelta[R: TypeInformation: ClassTag](
36
workset: DataSet[R],
37
maxIterations: Int,
38
keyFields: Array[String]
39
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
40
41
// Delta iteration with integer-based key fields
42
def iterateDelta[R: TypeInformation: ClassTag](
43
workset: DataSet[R],
44
maxIterations: Int,
45
keyFields: Array[Int]
46
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
47
}
48
```
49
50
## Usage Examples
51
52
### Simple Bulk Iteration - Powers of 2
53
54
```scala
55
import org.apache.flink.api.scala._
56
57
val env = ExecutionEnvironment.getExecutionEnvironment
58
59
// Start with initial value
60
val initial = env.fromElements(1)
61
62
// Iterate to compute powers of 2
63
val result = initial.iterate(10) { current =>
64
current.map(_ * 2)
65
}
66
67
// Result will be 1 * 2^10 = 1024
68
result.print()
69
```
70
71
### Bulk Iteration with Termination - Convergence
72
73
```scala
74
import org.apache.flink.api.scala._
75
76
case class Point(x: Double, y: Double)
77
78
val env = ExecutionEnvironment.getExecutionEnvironment
79
80
// Initial points
81
val initialPoints = env.fromElements(
82
Point(1.0, 1.0),
83
Point(2.0, 2.0),
84
Point(3.0, 3.0)
85
)
86
87
// Iterate until points converge to origin (0,0)
88
val convergedPoints = initialPoints.iterateWithTermination(100) { points =>
89
// Step function: move points closer to origin
90
val newPoints = points.map { p =>
91
Point(p.x * 0.9, p.y * 0.9)
92
}
93
94
// Termination condition: check if points are close enough to origin
95
val terminationCriterion = newPoints.filter { p =>
96
math.sqrt(p.x * p.x + p.y * p.y) > 0.01
97
}
98
99
(newPoints, terminationCriterion)
100
}
101
```
102
103
### Delta Iteration - Single Source Shortest Path
104
105
```scala
106
import org.apache.flink.api.scala._
107
108
case class Vertex(id: Long, distance: Double)
109
case class Edge(src: Long, dst: Long, weight: Double)
110
111
val env = ExecutionEnvironment.getExecutionEnvironment
112
113
// Graph vertices with initial distances (infinity except source)
114
val vertices = env.fromElements(
115
Vertex(1L, 0.0), // Source vertex
116
Vertex(2L, Double.PositiveInfinity),
117
Vertex(3L, Double.PositiveInfinity),
118
Vertex(4L, Double.PositiveInfinity)
119
)
120
121
// Graph edges
122
val edges = env.fromElements(
123
Edge(1L, 2L, 1.0),
124
Edge(1L, 3L, 4.0),
125
Edge(2L, 3L, 2.0),
126
Edge(2L, 4L, 5.0),
127
Edge(3L, 4L, 1.0)
128
)
129
130
// Initial workset: only source vertex
131
val initialWorkset = env.fromElements(Vertex(1L, 0.0))
132
133
// Run delta iteration
134
val shortestPaths = vertices.iterateDelta(initialWorkset, 10, Array("id")) {
135
(solutionSet, workset) =>
136
137
// Join workset with edges to find candidate updates
138
val candidates = workset
139
.join(edges)
140
.where(_.id)
141
.equalTo(_.src)
142
.apply { (vertex, edge) =>
143
Vertex(edge.dst, vertex.distance + edge.weight)
144
}
145
146
// Join candidates with current solution to find improvements
147
val updates = candidates
148
.join(solutionSet)
149
.where(_.id)
150
.equalTo(_.id)
151
.flatMap { (candidate, current) =>
152
if (candidate.distance < current.distance) {
153
Some(candidate)
154
} else {
155
None
156
}
157
}
158
159
// New solution set: merge updates
160
val newSolutionSet = solutionSet
161
.leftOuterJoin(updates)
162
.where(_.id)
163
.equalTo(_.id)
164
.apply { (current, updateOpt) =>
165
Option(updateOpt).getOrElse(current)
166
}
167
168
// New workset: vertices that were updated
169
val newWorkset = updates
170
171
(newSolutionSet, newWorkset)
172
}
173
```
174
175
### PageRank Algorithm
176
177
```scala
178
import org.apache.flink.api.scala._
179
180
case class Page(id: Long, rank: Double)
181
case class Link(src: Long, dst: Long)
182
183
val env = ExecutionEnvironment.getExecutionEnvironment
184
185
// Initial page ranks (equal distribution)
186
val pages = env.fromElements(
187
Page(1L, 1.0),
188
Page(2L, 1.0),
189
Page(3L, 1.0),
190
Page(4L, 1.0)
191
)
192
193
// Page links
194
val links = env.fromElements(
195
Link(1L, 2L),
196
Link(1L, 3L),
197
Link(2L, 3L),
198
Link(3L, 1L),
199
Link(3L, 4L),
200
Link(4L, 1L)
201
)
202
203
val dampingFactor = 0.85
204
val numPages = 4
205
206
// Calculate out-degrees for each page
207
val outDegrees = links
208
.groupBy(_.src)
209
.reduceGroup { links =>
210
val linkList = links.toList
211
val srcId = linkList.head.src
212
val degree = linkList.length
213
(srcId, degree)
214
}
215
216
// Run PageRank iteration
217
val pageRanks = pages.iterate(10) { currentRanks =>
218
219
// Calculate rank contributions
220
val contributions = currentRanks
221
.join(outDegrees)
222
.where(_.id)
223
.equalTo(_._1)
224
.flatMap { (page, outDegree) =>
225
val contribution = page.rank / outDegree._2
226
// This would need to be joined with links to get actual contributions
227
List((page.id, contribution)) // Simplified
228
}
229
.join(links)
230
.where(_._1)
231
.equalTo(_.src)
232
.map { (contrib, link) =>
233
(link.dst, contrib._2)
234
}
235
236
// Aggregate contributions and apply PageRank formula
237
val newRanks = contributions
238
.groupBy(_._1)
239
.reduceGroup { contribs =>
240
val contribList = contribs.toList
241
val pageId = contribList.head._1
242
val totalContrib = contribList.map(_._2).sum
243
Page(pageId, (1.0 - dampingFactor) / numPages + dampingFactor * totalContrib)
244
}
245
246
newRanks
247
}
248
```
249
250
### Connected Components with Delta Iteration
251
252
```scala
253
import org.apache.flink.api.scala._
254
255
case class ComponentVertex(id: Long, componentId: Long)
256
257
val env = ExecutionEnvironment.getExecutionEnvironment
258
259
// Initial vertices (each vertex is its own component)
260
val vertices = env.fromElements(
261
ComponentVertex(1L, 1L),
262
ComponentVertex(2L, 2L),
263
ComponentVertex(3L, 3L),
264
ComponentVertex(4L, 4L),
265
ComponentVertex(5L, 5L)
266
)
267
268
val edges = env.fromElements(
269
(1L, 2L), (2L, 3L), (4L, 5L)
270
)
271
272
// Initial workset: all vertices
273
val initialWorkset = vertices
274
275
val components = vertices.iterateDelta(initialWorkset, 10, Array("id")) {
276
(solutionSet, workset) =>
277
278
// Propagate minimum component ID to neighbors
279
val candidates = workset
280
.join(edges)
281
.where(_.id)
282
.equalTo(_._1)
283
.map { (vertex, edge) =>
284
ComponentVertex(edge._2, vertex.componentId)
285
}
286
.union(
287
workset
288
.join(edges)
289
.where(_.id)
290
.equalTo(_._2)
291
.map { (vertex, edge) =>
292
ComponentVertex(edge._1, vertex.componentId)
293
}
294
)
295
296
// Find minimum component ID for each vertex
297
val minCandidates = candidates
298
.groupBy(_.id)
299
.min("componentId")
300
301
// Update solution set with improvements
302
val updates = minCandidates
303
.join(solutionSet)
304
.where(_.id)
305
.equalTo(_.id)
306
.flatMap { (candidate, current) =>
307
if (candidate.componentId < current.componentId) {
308
Some(candidate)
309
} else {
310
None
311
}
312
}
313
314
val newSolutionSet = solutionSet
315
.leftOuterJoin(updates)
316
.where(_.id)
317
.equalTo(_.id)
318
.apply { (current, updateOpt) =>
319
Option(updateOpt).getOrElse(current)
320
}
321
322
(newSolutionSet, updates)
323
}
324
```
325
326
### K-Means Clustering
327
328
```scala
329
import org.apache.flink.api.scala._
330
331
case class Point2D(x: Double, y: Double)
332
case class Centroid(id: Int, x: Double, y: Double)
333
334
val env = ExecutionEnvironment.getExecutionEnvironment
335
336
// Data points
337
val points = env.fromElements(
338
Point2D(1.0, 1.0), Point2D(1.5, 1.5), Point2D(2.0, 2.0),
339
Point2D(8.0, 8.0), Point2D(8.5, 8.5), Point2D(9.0, 9.0)
340
)
341
342
// Initial centroids
343
val initialCentroids = env.fromElements(
344
Centroid(0, 0.0, 0.0),
345
Centroid(1, 5.0, 5.0)
346
)
347
348
// K-means iteration
349
val finalCentroids = initialCentroids.iterate(20) { centroids =>
350
351
// Assign each point to nearest centroid
352
val assignments = points
353
.map { point =>
354
// Find nearest centroid (simplified - would need to collect centroids)
355
// This is a simplified version for demonstration
356
val nearestCentroidId = 0 // Would calculate actual nearest
357
(nearestCentroidId, point, 1)
358
}
359
360
// Calculate new centroids
361
val newCentroids = assignments
362
.groupBy(_._1)
363
.reduceGroup { assignments =>
364
val assignmentList = assignments.toList
365
val centroidId = assignmentList.head._1
366
val pointSum = assignmentList.map(_._2).reduce { (p1, p2) =>
367
Point2D(p1.x + p2.x, p1.y + p2.y)
368
}
369
val count = assignmentList.length
370
Centroid(centroidId, pointSum.x / count, pointSum.y / count)
371
}
372
373
newCentroids
374
}
375
```
376
377
## Best Practices
378
379
### Performance Considerations
380
381
1. **Use Delta Iteration for Sparse Updates**: When only a small fraction of data changes per iteration
382
2. **Choose Appropriate Termination Conditions**: Balance accuracy with performance
383
3. **Optimize Broadcast Variables**: Use for small lookup tables in iterations
384
4. **Consider Checkpointing**: For long-running iterations to handle failures
385
386
### Common Patterns
387
388
1. **Graph Algorithms**: Use delta iteration with vertex-centric approach
389
2. **Machine Learning**: Use bulk iteration for gradient descent algorithms
390
3. **Convergence Detection**: Implement custom termination criteria based on convergence metrics
391
4. **State Management**: Use solution sets in delta iteration to maintain complete state