0
# Iterative Algorithms
1
2
Complete API for implementing custom graph algorithms using different iteration patterns and the algorithm framework.
3
4
## Iteration Patterns Overview
5
6
Flink Gelly Scala provides three main iteration patterns for implementing graph algorithms:
7
8
1. **Scatter-Gather** - Vertices scatter messages along edges and gather/apply updates
9
2. **Gather-Sum-Apply (GSA)** - Three-phase pattern for vertex-centric computations
10
3. **Vertex-Centric (Pregel)** - Pregel-style vertex-centric iterations with message passing
11
12
## Scatter-Gather Iterations
13
14
### Basic Scatter-Gather
15
16
```scala { .api }
17
def runScatterGatherIteration[M](
18
scatterFunction: ScatterFunction[K, VV, M, EV],
19
gatherFunction: GatherFunction[K, VV, M],
20
maxIterations: Int
21
): Graph[K, VV, EV]
22
```
23
24
Runs a scatter-gather iteration on the graph without configuration options.
25
26
**Parameters:**
27
- `scatterFunction` - Function that scatters messages from vertices to neighbors
28
- `gatherFunction` - Function that gathers messages and updates vertex values
29
- `maxIterations` - Maximum number of iterations to perform
30
31
**Returns:** Updated graph after convergence or maximum iterations
32
33
### Configured Scatter-Gather
34
35
```scala { .api }
36
def runScatterGatherIteration[M](
37
scatterFunction: ScatterFunction[K, VV, M, EV],
38
gatherFunction: GatherFunction[K, VV, M],
39
maxIterations: Int,
40
parameters: ScatterGatherConfiguration
41
): Graph[K, VV, EV]
42
```
43
44
Runs scatter-gather iteration with configuration options.
45
46
**Parameters:**
47
- `scatterFunction` - Message scattering function
48
- `gatherFunction` - Message gathering and vertex update function
49
- `maxIterations` - Maximum iterations
50
- `parameters` - Iteration configuration parameters
51
52
## Gather-Sum-Apply Iterations
53
54
### Basic GSA
55
56
```scala { .api }
57
def runGatherSumApplyIteration[M](
58
gatherFunction: GatherFunction[VV, EV, M],
59
sumFunction: SumFunction[VV, EV, M],
60
applyFunction: ApplyFunction[K, VV, M],
61
maxIterations: Int
62
): Graph[K, VV, EV]
63
```
64
65
Runs a Gather-Sum-Apply iteration without configuration options.
66
67
**Parameters:**
68
- `gatherFunction` - Collects information about adjacent vertices and edges
69
- `sumFunction` - Aggregates the gathered information
70
- `applyFunction` - Updates vertex values with the aggregated data
71
- `maxIterations` - Maximum number of iterations
72
73
### Configured GSA
74
75
```scala { .api }
76
def runGatherSumApplyIteration[M](
77
gatherFunction: GatherFunction[VV, EV, M],
78
sumFunction: SumFunction[VV, EV, M],
79
applyFunction: ApplyFunction[K, VV, M],
80
maxIterations: Int,
81
parameters: GSAConfiguration
82
): Graph[K, VV, EV]
83
```
84
85
Runs GSA iteration with configuration parameters.
86
87
**Parameters:**
88
- `gatherFunction` - Information gathering function
89
- `sumFunction` - Aggregation function
90
- `applyFunction` - Vertex update function
91
- `maxIterations` - Maximum iterations
92
- `parameters` - GSA configuration parameters
93
94
## Vertex-Centric Iterations (Pregel)
95
96
### Basic Vertex-Centric
97
98
```scala { .api }
99
def runVertexCentricIteration[M](
100
computeFunction: ComputeFunction[K, VV, EV, M],
101
combineFunction: MessageCombiner[K, M],
102
maxIterations: Int
103
): Graph[K, VV, EV]
104
```
105
106
Runs a vertex-centric iteration without configuration options.
107
108
**Parameters:**
109
- `computeFunction` - Vertex compute function that processes messages and updates values
110
- `combineFunction` - Optional message combiner function
111
- `maxIterations` - Maximum number of iterations
112
113
### Configured Vertex-Centric
114
115
```scala { .api }
116
def runVertexCentricIteration[M](
117
computeFunction: ComputeFunction[K, VV, EV, M],
118
combineFunction: MessageCombiner[K, M],
119
maxIterations: Int,
120
parameters: VertexCentricConfiguration
121
): Graph[K, VV, EV]
122
```
123
124
Runs vertex-centric iteration with configuration parameters.
125
126
**Parameters:**
127
- `computeFunction` - Vertex computation function
128
- `combineFunction` - Message combiner for reducing messages
129
- `maxIterations` - Maximum iterations
130
- `parameters` - Vertex-centric configuration parameters
131
132
## Algorithm Framework Integration
133
134
### Algorithm Execution
135
136
```scala { .api }
137
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
138
```
139
140
Executes a graph algorithm that implements the `GraphAlgorithm` interface.
141
142
**Parameters:**
143
- `algorithm` - Graph algorithm implementation
144
145
**Returns:** Algorithm result of type `T`
146
147
### Analytics Execution
148
149
```scala { .api }
150
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]
151
```
152
153
Executes a graph analytic. Analytics are terminal operations whose results are retrieved via accumulators, allowing composition of multiple analytics and algorithms into a single program.
154
155
**Parameters:**
156
- `analytic` - Graph analytic implementation
157
158
**Returns:** The analytic instance for result retrieval
159
160
## Function Interfaces
161
162
### Scatter Function
163
164
```scala { .api }
165
abstract class ScatterFunction[K, VV, M, EV] {
166
def sendMessages(vertex: Vertex[K, VV]): Unit
167
def sendMessageTo(targetVertexId: K, message: M): Unit
168
def getOutEdges: Iterable[Edge[K, EV]]
169
def getInEdges: Iterable[Edge[K, EV]]
170
}
171
```
172
173
Base class for scatter functions that send messages from vertices to their neighbors.
174
175
### Gather Function (Scatter-Gather)
176
177
```scala { .api }
178
abstract class GatherFunction[K, VV, M] {
179
def gather(messages: Iterable[M]): VV
180
def getSuperstepNumber: Int
181
}
182
```
183
184
Base class for gather functions in scatter-gather iterations.
185
186
### Gather Function (GSA)
187
188
```scala { .api }
189
abstract class GatherFunction[VV, EV, M] {
190
def gather(neighborVertex: Vertex[_, VV], edge: Edge[_, EV]): M
191
}
192
```
193
194
Base class for gather functions in GSA iterations.
195
196
### Sum Function
197
198
```scala { .api }
199
abstract class SumFunction[VV, EV, M] {
200
def sum(newValue: M, currentValue: M): M
201
}
202
```
203
204
Base class for sum functions that aggregate gathered information in GSA iterations.
205
206
### Apply Function
207
208
```scala { .api }
209
abstract class ApplyFunction[K, VV, M] {
210
def apply(newValue: M, currentValue: VV): VV
211
def getSuperstepNumber: Int
212
}
213
```
214
215
Base class for apply functions that update vertex values in GSA iterations.
216
217
### Compute Function
218
219
```scala { .api }
220
abstract class ComputeFunction[K, VV, EV, M] {
221
def compute(vertex: Vertex[K, VV], messages: Iterable[M]): Unit
222
def sendMessageTo(targetVertexId: K, message: M): Unit
223
def setNewVertexValue(newValue: VV): Unit
224
def getSuperstepNumber: Int
225
def getOutEdges: Iterable[Edge[K, EV]]
226
def getTotalNumVertices: Long
227
}
228
```
229
230
Base class for compute functions in vertex-centric iterations.
231
232
### Message Combiner
233
234
```scala { .api }
235
abstract class MessageCombiner[K, M] {
236
def combineMessages(messages: Iterable[M]): M
237
}
238
```
239
240
Base class for combining multiple messages sent to the same vertex.
241
242
## Configuration Classes
243
244
### ScatterGatherConfiguration
245
246
```scala { .api }
247
class ScatterGatherConfiguration {
248
def setName(name: String): ScatterGatherConfiguration
249
def setParallelism(parallelism: Int): ScatterGatherConfiguration
250
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): ScatterGatherConfiguration
251
def setOptNumVertices(numVertices: Long): ScatterGatherConfiguration
252
}
253
```
254
255
Configuration for scatter-gather iterations.
256
257
### GSAConfiguration
258
259
```scala { .api }
260
class GSAConfiguration {
261
def setName(name: String): GSAConfiguration
262
def setParallelism(parallelism: Int): GSAConfiguration
263
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): GSAConfiguration
264
def setOptNumVertices(numVertices: Long): GSAConfiguration
265
}
266
```
267
268
Configuration for gather-sum-apply iterations.
269
270
### VertexCentricConfiguration
271
272
```scala { .api }
273
class VertexCentricConfiguration {
274
def setName(name: String): VertexCentricConfiguration
275
def setParallelism(parallelism: Int): VertexCentricConfiguration
276
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): VertexCentricConfiguration
277
def setOptNumVertices(numVertices: Long): VertexCentricConfiguration
278
}
279
```
280
281
Configuration for vertex-centric iterations.
282
283
## Usage Examples
284
285
### Single Source Shortest Path (Scatter-Gather)
286
287
```scala
288
import org.apache.flink.graph.spargel.{ScatterFunction, GatherFunction}
289
290
// Scatter function sends current distance + edge weight to neighbors
291
class DistanceScatter extends ScatterFunction[Long, Double, Double, Double] {
292
override def sendMessages(vertex: Vertex[Long, Double]): Unit = {
293
if (vertex.getValue < Double.MaxValue) {
294
for (edge <- getOutEdges) {
295
sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
296
}
297
}
298
}
299
}
300
301
// Gather function takes minimum of received distances
302
class DistanceGather extends GatherFunction[Long, Double, Double] {
303
override def gather(messages: Iterable[Double]): Double = {
304
var min = Double.MaxValue
305
for (msg <- messages) {
306
min = Math.min(min, msg)
307
}
308
min
309
}
310
}
311
312
// Run algorithm
313
val shortestPaths = graph.runScatterGatherIteration(
314
new DistanceScatter(),
315
new DistanceGather(),
316
maxIterations = 10
317
)
318
```
319
320
### PageRank (Vertex-Centric)
321
322
```scala
323
import org.apache.flink.graph.pregel.ComputeFunction
324
import org.apache.flink.graph.{Vertex, Edge}
325
326
class PageRankCompute(maxIterations: Int) extends ComputeFunction[Long, Double, Double, Double] {
327
override def compute(vertex: Vertex[Long, Double], messages: Iterable[Double]): Unit = {
328
val dampingFactor = 0.85
329
val numVertices = getTotalNumVertices
330
331
var sum = 0.0
332
for (msg <- messages) {
333
sum += msg
334
}
335
336
val newValue = (1.0 - dampingFactor) / numVertices + dampingFactor * sum
337
setNewVertexValue(newValue)
338
339
// Send messages to neighbors
340
if (getSuperstepNumber < maxIterations - 1) {
341
val outDegree = getOutEdges.size
342
if (outDegree > 0) {
343
val msgValue = newValue / outDegree
344
for (edge <- getOutEdges) {
345
sendMessageTo(edge.getTarget, msgValue)
346
}
347
}
348
}
349
}
350
}
351
352
// Run PageRank
353
val maxIterations = 10
354
val pageRankResult = graph.runVertexCentricIteration(
355
new PageRankCompute(maxIterations),
356
null, // No message combiner
357
maxIterations
358
)
359
```
360
361
### Connected Components (GSA)
362
363
```scala
364
import org.apache.flink.graph.gsa.{GatherFunction, SumFunction, ApplyFunction}
365
366
class ComponentIdGather extends GatherFunction[Long, NullValue, Long] {
367
override def gather(neighborVertex: Vertex[_, Long], edge: Edge[_, NullValue]): Long = {
368
neighborVertex.getValue
369
}
370
}
371
372
class ComponentIdSum extends SumFunction[Long, NullValue, Long] {
373
override def sum(newValue: Long, currentValue: Long): Long = {
374
Math.min(newValue, currentValue)
375
}
376
}
377
378
class ComponentIdApply extends ApplyFunction[Long, Long, Long] {
379
override def apply(newValue: Long, currentValue: Long): Long = {
380
Math.min(newValue, currentValue)
381
}
382
}
383
384
// Run connected components
385
val components = graph.runGatherSumApplyIteration(
386
new ComponentIdGather(),
387
new ComponentIdSum(),
388
new ComponentIdApply(),
389
maxIterations = 10
390
)
391
```
392
393
### Using Built-in Algorithms
394
395
```scala
396
import org.apache.flink.graph.library.PageRank
397
398
// Use pre-implemented PageRank algorithm
399
val pageRankAlgorithm = new PageRank[Long](dampingFactor = 0.85, maxIterations = 10)
400
val pageRankResult = graph.run(pageRankAlgorithm)
401
402
// The result is a DataSet[(Long, Double)] containing vertex IDs and PageRank scores
403
val topVertices = pageRankResult.first(10)
404
```