0
# Graph Algorithms
1
2
Iterative computation models for implementing sophisticated graph algorithms using scatter-gather, gather-sum-apply, and vertex-centric paradigms, along with algorithm execution framework.
3
4
## Capabilities
5
6
### Algorithm Execution Framework
7
8
Execute pre-built algorithms and analytics on graphs with type-safe result handling.
9
10
```scala { .api }
11
/**
12
* Run a graph algorithm on the graph.
13
* @param algorithm the algorithm to run on the Graph
14
* @return the result of the graph algorithm
15
*/
16
def run[T](algorithm: GraphAlgorithm[K, VV, EV, T]): T
17
18
/**
19
* Run a graph analytic on the graph.
20
* A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results
21
* are retrieved via accumulators. A Flink program has a single point of
22
* execution. A GraphAnalytic defers execution to the user to allow composing
23
* multiple analytics and algorithms into a single program.
24
* @param analytic the analytic to run on the Graph
25
*/
26
def run[T](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]
27
```
28
29
### Scatter-Gather Iterations
30
31
Vertex-centric iterative computation model where vertices scatter messages along edges and gather messages from neighbors.
32
33
```scala { .api }
34
/**
35
* Runs a scatter-gather iteration on the graph.
36
* No configuration options are provided.
37
* @param scatterFunction the scatter function
38
* @param gatherFunction the gather function
39
* @param maxIterations maximum number of iterations to perform
40
* @return the updated Graph after the scatter-gather iteration has converged or
41
* after maximumNumberOfIterations.
42
*/
43
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
44
gatherFunction: GatherFunction[K, VV, M],
45
maxIterations: Int): Graph[K, VV, EV]
46
47
/**
48
* Runs a scatter-gather iteration on the graph with configuration options.
49
* @param scatterFunction the scatter function
50
* @param gatherFunction the gather function
51
* @param maxIterations maximum number of iterations to perform
52
* @param parameters the iteration configuration parameters
53
* @return the updated Graph after the scatter-gather iteration has converged or
54
* after maximumNumberOfIterations.
55
*/
56
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
57
gatherFunction: GatherFunction[K, VV, M],
58
maxIterations: Int,
59
parameters: ScatterGatherConfiguration): Graph[K, VV, EV]
60
```
61
62
### Gather-Sum-Apply Iterations
63
64
Three-phase iterative computation model for graph algorithms that need to collect neighborhood information, aggregate it, and update vertex values.
65
66
```scala { .api }
67
/**
68
* Runs a Gather-Sum-Apply iteration on the graph.
69
* No configuration options are provided.
70
* @param gatherFunction the gather function collects information about adjacent
71
* vertices and edges
72
* @param sumFunction the sum function aggregates the gathered information
73
* @param applyFunction the apply function updates the vertex values with the aggregates
74
* @param maxIterations maximum number of iterations to perform
75
* @tparam M the intermediate type used between gather, sum and apply
76
* @return the updated Graph after the gather-sum-apply iteration has converged or
77
* after maximumNumberOfIterations.
78
*/
79
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],
80
sumFunction: SumFunction[VV, EV, M],
81
applyFunction: ApplyFunction[K, VV, M],
82
maxIterations: Int): Graph[K, VV, EV]
83
84
/**
85
* Runs a Gather-Sum-Apply iteration on the graph with configuration options.
86
* @param gatherFunction the gather function collects information about adjacent
87
* vertices and edges
88
* @param sumFunction the sum function aggregates the gathered information
89
* @param applyFunction the apply function updates the vertex values with the aggregates
90
* @param maxIterations maximum number of iterations to perform
91
* @param parameters the iteration configuration parameters
92
* @tparam M the intermediate type used between gather, sum and apply
93
* @return the updated Graph after the gather-sum-apply iteration has converged or
94
* after maximumNumberOfIterations.
95
*/
96
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],
97
sumFunction: SumFunction[VV, EV, M],
98
applyFunction: ApplyFunction[K, VV, M],
99
maxIterations: Int,
100
parameters: GSAConfiguration): Graph[K, VV, EV]
101
```
102
103
### Vertex-Centric Iterations
104
105
Pregel-style iterative computation model where vertices receive messages, perform computations, and send messages to neighbors.
106
107
```scala { .api }
108
/**
109
* Runs a vertex-centric iteration on the graph.
110
* No configuration options are provided.
111
* @param computeFunction the compute function
112
* @param combineFunction the optional message combiner function
113
* @param maxIterations maximum number of iterations to perform
114
* @return the updated Graph after the vertex-centric iteration has converged or
115
* after maximumNumberOfIterations.
116
*/
117
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
118
combineFunction: MessageCombiner[K, M],
119
maxIterations: Int): Graph[K, VV, EV]
120
121
/**
122
* Runs a vertex-centric iteration on the graph with configuration options.
123
* @param computeFunction the compute function
124
* @param combineFunction the optional message combiner function
125
* @param maxIterations maximum number of iterations to perform
126
* @param parameters the iteration configuration parameters
127
* @return the updated Graph after the vertex-centric iteration has converged or
128
* after maximumNumberOfIterations.
129
*/
130
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
131
combineFunction: MessageCombiner[K, M],
132
maxIterations: Int,
133
parameters: VertexCentricConfiguration): Graph[K, VV, EV]
134
```
135
136
### Graph Validation
137
138
Validate graph properties and constraints using custom validators.
139
140
```scala { .api }
141
/**
142
* Validate the graph using a GraphValidator.
143
* @param validator the validator to apply
144
* @return true if the graph is valid according to the validator, false otherwise
145
*/
146
def validate(validator: GraphValidator[K, VV, EV]): Boolean
147
```
148
149
## Algorithm Types and Patterns
150
151
### Function Interfaces (From Java Gelly)
152
153
The iteration methods use these key function interfaces from the underlying Java Gelly library:
154
155
```scala { .api }
156
// Scatter-Gather Functions
157
trait ScatterFunction[K, VV, M, EV] {
158
def sendMessages(vertex: Vertex[K, VV]): Unit
159
}
160
161
trait GatherFunction[K, VV, M] {
162
def updateVertex(vertex: Vertex[K, VV], inMessages: MessageIterator[M]): VV
163
}
164
165
// Gather-Sum-Apply Functions
166
trait GSAGatherFunction[VV, EV, M] {
167
def gather(neighbor: Neighbor[VV, EV]): M
168
}
169
170
trait SumFunction[VV, EV, M] {
171
def sum(arg0: M, arg1: M): M
172
}
173
174
trait ApplyFunction[K, VV, M] {
175
def apply(newValue: M, currentValue: VV): VV
176
}
177
178
// Vertex-Centric Functions
179
trait ComputeFunction[K, VV, EV, M] {
180
def compute(vertex: Vertex[K, VV], messages: MessageIterator[M]): Unit
181
}
182
183
trait MessageCombiner[K, M] {
184
def combineMessages(arg0: M, arg1: M): M
185
}
186
187
// Validation
188
trait GraphValidator[K, VV, EV] {
189
def validate(graph: Graph[K, VV, EV]): Boolean
190
}
191
```
192
193
### Configuration Types
194
195
Algorithm behavior can be customized using configuration objects:
196
197
```scala { .api }
198
// Configuration classes (from Java Gelly)
199
class ScatterGatherConfiguration
200
class GSAConfiguration
201
class VertexCentricConfiguration
202
```
203
204
**Usage Examples:**
205
206
```scala
207
import org.apache.flink.graph.scala._
208
import org.apache.flink.graph._
209
import org.apache.flink.graph.spargel._
210
import org.apache.flink.graph.gsa._
211
import org.apache.flink.graph.pregel._
212
213
// Example: Single Source Shortest Path using Scatter-Gather
214
class SSSPScatter[K](sourceId: K) extends ScatterFunction[K, Double, Double, Double] {
215
override def sendMessages(vertex: Vertex[K, Double]): Unit = {
216
if (vertex.getId == sourceId || vertex.getValue < Double.MaxValue) {
217
for (edge <- getEdges) {
218
sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
219
}
220
}
221
}
222
}
223
224
class SSSPGather extends GatherFunction[K, Double, Double] {
225
override def updateVertex(vertex: Vertex[K, Double],
226
inMessages: MessageIterator[Double]): Double = {
227
var minDistance = vertex.getValue
228
while (inMessages.hasNext) {
229
val msg = inMessages.next()
230
if (msg < minDistance) {
231
minDistance = msg
232
}
233
}
234
minDistance
235
}
236
}
237
238
// Run SSSP algorithm
239
val sourceId = 1L
240
val scatterFunction = new SSSPScatter(sourceId)
241
val gatherFunction = new SSSPGather()
242
val result = graph.runScatterGatherIteration(scatterFunction, gatherFunction, 10)
243
244
// Example: PageRank using Gather-Sum-Apply
245
class PageRankGather extends GSAGatherFunction[Double, Double, Double] {
246
override def gather(neighbor: Neighbor[Double, Double]): Double = {
247
neighbor.getNeighborVertex.getValue / neighbor.getNeighborVertex.getOutDegree
248
}
249
}
250
251
class PageRankSum extends SumFunction[Double, Double, Double] {
252
override def sum(arg0: Double, arg1: Double): Double = arg0 + arg1
253
}
254
255
class PageRankApply(dampingFactor: Double) extends ApplyFunction[K, Double, Double] {
256
override def apply(newValue: Double, currentValue: Double): Double = {
257
(1.0 - dampingFactor) + dampingFactor * newValue
258
}
259
}
260
261
// Run PageRank algorithm
262
val gatherFunc = new PageRankGather()
263
val sumFunc = new PageRankSum()
264
val applyFunc = new PageRankApply(0.85)
265
val pageRankResult = graph.runGatherSumApplyIteration(gatherFunc, sumFunc, applyFunc, 10)
266
267
// Example: Using built-in algorithms
268
import org.apache.flink.graph.library._
269
270
val ssspAlgorithm = new SingleSourceShortestPaths[K, Double](sourceId, 10)
271
val distances = graph.run(ssspAlgorithm)
272
273
val pageRankAlgorithm = new PageRank[K, Double, Double](0.85, 10)
274
val pageRankValues = graph.run(pageRankAlgorithm)
275
```
276
277
### Algorithm Design Patterns
278
279
#### Scatter-Gather Pattern
280
Best for algorithms where:
281
- Vertices need to send information to their neighbors
282
- Each vertex processes messages from its neighbors independently
283
- Examples: Single Source Shortest Path, Connected Components
284
285
#### Gather-Sum-Apply Pattern
286
Best for algorithms where:
287
- You need to collect and aggregate information from neighbors
288
- The aggregation can be expressed as an associative operation
289
- Examples: PageRank, Triangle Counting
290
291
#### Vertex-Centric Pattern
292
Best for algorithms where:
293
- Vertices need fine-grained control over message sending
294
- Complex message processing is required
295
- Examples: Graph Coloring, Community Detection
296
297
### Performance Considerations
298
299
- **Convergence**: All iteration methods support both maximum iteration limits and convergence criteria
300
- **Configuration**: Use configuration objects to tune performance (memory, networking, convergence)
301
- **Message Types**: Choose efficient message types (M) to minimize serialization overhead
302
- **Partitioning**: Consider graph partitioning strategies for large graphs
303
- **Checkpointing**: Enable checkpointing for long-running iterative algorithms