0
# Iterative Processing Models
1
2
Gelly provides three distributed iterative computation models for implementing graph algorithms: Vertex-centric (Pregel), Scatter-Gather, and Gather-Sum-Apply. Each model provides different abstractions for message-passing graph computations on distributed Flink clusters.
3
4
## Capabilities
5
6
### Vertex-Centric (Pregel) Model
7
8
The vertex-centric model follows Google's Pregel programming paradigm where computation is performed at each vertex by processing incoming messages and sending messages to neighbors.
9
10
```java { .api }
11
public <M> Graph<K, VV, EV> runVertexCentricIteration(
12
ComputeFunction<K, VV, EV, M> computeFunction,
13
MessageCombiner<K, M> combiner,
14
int maxIterations)
15
16
public <M> Graph<K, VV, EV> runVertexCentricIteration(
17
ComputeFunction<K, VV, EV, M> computeFunction,
18
MessageCombiner<K, M> combiner,
19
int maxIterations,
20
VertexCentricConfiguration parameters)
21
```
22
23
#### ComputeFunction<K, VV, EV, M>
24
25
Defines the computation performed at each vertex in each iteration.
26
27
```java { .api }
28
public abstract class ComputeFunction<K, VV, EV, M> implements Serializable {
29
public abstract void compute(
30
Vertex<K, VV> vertex,
31
MessageIterator<M> messages) throws Exception;
32
33
// Message sending methods
34
public void sendMessageToAllNeighbors(M message)
35
public void sendMessageTo(K target, M message)
36
public Iterable<Edge<K, EV>> getEdges()
37
38
// Vertex value update
39
public void setNewVertexValue(VV newValue)
40
41
// Utility methods
42
public long getNumberOfVertices()
43
public int getSuperstepNumber()
44
public <T> Aggregator<T> getIterationAggregator(String name)
45
public void preSuperstep() throws Exception
46
public void postSuperstep() throws Exception
47
}
48
```
49
50
#### MessageCombiner<K, M>
51
52
Optional combiner to reduce message traffic by combining messages sent to the same vertex.
53
54
```java { .api }
55
public interface MessageCombiner<K, M> extends java.io.Serializable {
56
void combineMessages(MessageIterator<M> messages, Collector<M> out) throws Exception;
57
}
58
```
59
60
#### MessageIterator<M>
61
62
Iterator for processing incoming messages at a vertex.
63
64
```java { .api }
65
public interface MessageIterator<M> extends Iterator<M> {
66
boolean hasNext();
67
M next();
68
}
69
```
70
71
#### VertexCentricConfiguration
72
73
Configuration options for vertex-centric iterations.
74
75
```java { .api }
76
public class VertexCentricConfiguration {
77
public VertexCentricConfiguration setName(String name)
78
public VertexCentricConfiguration setParallelism(int parallelism)
79
public VertexCentricConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
80
public VertexCentricConfiguration registerAggregator(String name, Aggregator<?> aggregator)
81
}
82
```
83
84
**Usage Example:**
85
86
```java
87
// Single Source Shortest Path using Vertex-Centric iteration
88
public class SSSpComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
89
@Override
90
public void compute(Vertex<Long, Double> vertex,
91
MessageIterator<Double> messages) throws Exception {
92
93
double minDistance = (vertex.getId().equals(1L)) ? 0.0 : Double.POSITIVE_INFINITY;
94
95
// Update distance with minimum from incoming messages
96
while (messages.hasNext()) {
97
minDistance = Math.min(minDistance, messages.next());
98
}
99
100
// If distance changed, propagate to neighbors
101
if (minDistance < vertex.getValue()) {
102
setNewVertexValue(minDistance);
103
// Send updated distance + edge weight to all neighbors
104
for (Edge<Long, Double> edge : getEdges()) {
105
sendMessageTo(edge.getTarget(), minDistance + edge.getValue());
106
}
107
}
108
}
109
}
110
111
// Run the algorithm
112
Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
113
new SSSpComputeFunction(),
114
new MinMessageCombiner(),
115
maxIterations
116
);
117
```
118
119
### Scatter-Gather Model
120
121
The scatter-gather model separates message sending (scatter) and vertex update (gather) into distinct phases.
122
123
```java { .api }
124
public <M> Graph<K, VV, EV> runScatterGatherIteration(
125
ScatterFunction<K, VV, M, EV> scatterFunction,
126
GatherFunction<K, VV, M> gatherFunction,
127
int maxIterations)
128
129
public <M> Graph<K, VV, EV> runScatterGatherIteration(
130
ScatterFunction<K, VV, M, EV> scatterFunction,
131
GatherFunction<K, VV, M> gatherFunction,
132
int maxIterations,
133
ScatterGatherConfiguration parameters)
134
```
135
136
#### ScatterFunction<K, VV, M, EV>
137
138
Defines how messages are sent to neighbors in the scatter phase.
139
140
```java { .api }
141
public abstract class ScatterFunction<K, VV, M, EV> extends AbstractRichFunction {
142
public abstract void sendMessages(Vertex<K, VV> vertex, Collector<Tuple2<K, M>> out) throws Exception;
143
144
// Access to outgoing edges
145
public Iterable<Edge<K, EV>> getEdges()
146
147
// Utility methods
148
public long getNumberOfVertices()
149
public int getSuperstepNumber()
150
public <T> Aggregator<T> getIterationAggregator(String name)
151
public void preSuperstep() throws Exception
152
public void postSuperstep() throws Exception
153
}
154
```
155
156
#### GatherFunction<K, VV, M>
157
158
Defines how vertex values are updated based on incoming messages in the gather phase.
159
160
```java { .api }
161
public abstract class GatherFunction<K, VV, M> extends AbstractRichFunction {
162
public abstract VV updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;
163
164
// Utility methods
165
public long getNumberOfVertices()
166
public int getSuperstepNumber()
167
public <T> Aggregator<T> getIterationAggregator(String name)
168
public void preSuperstep() throws Exception
169
public void postSuperstep() throws Exception
170
}
171
```
172
173
#### ScatterGatherConfiguration
174
175
Configuration options for scatter-gather iterations.
176
177
```java { .api }
178
public class ScatterGatherConfiguration {
179
public ScatterGatherConfiguration setName(String name)
180
public ScatterGatherConfiguration setParallelism(int parallelism)
181
public ScatterGatherConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
182
public ScatterGatherConfiguration registerAggregator(String name, Aggregator<?> aggregator)
183
public ScatterGatherConfiguration setOptDegrees(boolean optDegrees)
184
}
185
```
186
187
**Usage Example:**
188
189
```java
190
// PageRank using Scatter-Gather
191
public class PageRankScatter extends ScatterFunction<Long, Double, Double, NullValue> {
192
@Override
193
public void sendMessages(Vertex<Long, Double> vertex, Collector<Tuple2<Long, Double>> out) {
194
int degree = 0;
195
for (Edge<Long, NullValue> edge : getEdges()) {
196
degree++;
197
}
198
199
double rankToSend = vertex.getValue() / degree;
200
for (Edge<Long, NullValue> edge : getEdges()) {
201
out.collect(new Tuple2<>(edge.getTarget(), rankToSend));
202
}
203
}
204
}
205
206
public class PageRankGather extends GatherFunction<Long, Double, Double> {
207
private final double dampingFactor = 0.85;
208
209
@Override
210
public Double updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
211
double sum = 0.0;
212
while (inMessages.hasNext()) {
213
sum += inMessages.next();
214
}
215
return (1.0 - dampingFactor) / getNumberOfVertices() + dampingFactor * sum;
216
}
217
}
218
219
// Run PageRank
220
Graph<Long, Double, NullValue> result = graph.runScatterGatherIteration(
221
new PageRankScatter(),
222
new PageRankGather(),
223
maxIterations
224
);
225
```
226
227
### Gather-Sum-Apply (GSA) Model
228
229
The GSA model provides three distinct phases: gather information from neighbors, sum/aggregate the gathered information, and apply the result to update vertex values.
230
231
```java { .api }
232
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
233
GatherFunction<VV, EV, M> gatherFunction,
234
SumFunction<VV, EV, M> sumFunction,
235
ApplyFunction<K, VV, M> applyFunction,
236
int maxIterations)
237
238
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
239
GatherFunction<VV, EV, M> gatherFunction,
240
SumFunction<VV, EV, M> sumFunction,
241
ApplyFunction<K, VV, M> applyFunction,
242
int maxIterations,
243
GSAConfiguration parameters)
244
```
245
246
#### GatherFunction<VV, EV, M>
247
248
Gathers information from each neighbor edge.
249
250
```java { .api }
251
public abstract class GatherFunction<VV, EV, M> extends AbstractRichFunction {
252
public abstract M gather(Neighbor<VV, EV> neighbor) throws Exception;
253
254
// Utility methods
255
public long getNumberOfVertices()
256
public int getSuperstepNumber()
257
public <T> Aggregator<T> getIterationAggregator(String name)
258
public void preSuperstep() throws Exception
259
public void postSuperstep() throws Exception
260
}
261
```
262
263
#### SumFunction<VV, EV, M>
264
265
Aggregates all gathered values for a vertex.
266
267
```java { .api }
268
public abstract class SumFunction<VV, EV, M> extends AbstractRichFunction {
269
public abstract M sum(M value1, M value2) throws Exception;
270
}
271
```
272
273
#### ApplyFunction<K, VV, M>
274
275
Applies the aggregated result to update the vertex value.
276
277
```java { .api }
278
public abstract class ApplyFunction<K, VV, M> extends AbstractRichFunction {
279
public abstract VV apply(VV currentValue, M sum) throws Exception;
280
281
// Utility methods
282
public long getNumberOfVertices()
283
public int getSuperstepNumber()
284
public <T> Aggregator<T> getIterationAggregator(String name)
285
public void preSuperstep() throws Exception
286
public void postSuperstep() throws Exception
287
}
288
```
289
290
#### Neighbor<VV, EV>
291
292
Represents a neighbor vertex and connecting edge in GSA iterations.
293
294
```java { .api }
295
public class Neighbor<VV, EV> {
296
public VV getNeighborValue()
297
public EV getEdgeValue()
298
}
299
```
300
301
#### GSAConfiguration
302
303
Configuration options for GSA iterations.
304
305
```java { .api }
306
public class GSAConfiguration {
307
public GSAConfiguration setName(String name)
308
public GSAConfiguration setParallelism(int parallelism)
309
public GSAConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
310
public GSAConfiguration registerAggregator(String name, Aggregator<?> aggregator)
311
public GSAConfiguration setOptDegrees(boolean optDegrees)
312
}
313
```
314
315
**Usage Example:**
316
317
```java
318
// Connected Components using GSA
319
public class CCGather extends GatherFunction<Long, NullValue, Long> {
320
@Override
321
public Long gather(Neighbor<Long, NullValue> neighbor) {
322
return neighbor.getNeighborValue();
323
}
324
}
325
326
public class CCSum extends SumFunction<Long, NullValue, Long> {
327
@Override
328
public Long sum(Long value1, Long value2) {
329
return Math.min(value1, value2);
330
}
331
}
332
333
public class CCApply extends ApplyFunction<Long, Long, Long> {
334
@Override
335
public Long apply(Long currentValue, Long sum) {
336
return Math.min(currentValue, sum);
337
}
338
}
339
340
// Run Connected Components
341
Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(
342
new CCGather(),
343
new CCSum(),
344
new CCApply(),
345
maxIterations
346
);
347
```
348
349
## Common Patterns
350
351
### Convergence Detection
352
353
All iteration models support convergence detection through aggregators:
354
355
```java
356
// In compute/scatter/gather/apply functions
357
LongSumAggregator changedVertices = getIterationAggregator("changed");
358
if (valueChanged) {
359
changedVertices.aggregate(1L);
360
}
361
362
// Check convergence in configuration
363
configuration.registerAggregator("changed", new LongSumAggregator());
364
```
365
366
### Performance Optimization
367
368
- **Message Combiners**: Reduce network traffic in vertex-centric model
369
- **Degree Optimization**: Enable `setOptDegrees(true)` for degree-based algorithms
370
- **Memory Management**: Configure solution set memory for large graphs
371
- **Parallelism**: Set appropriate parallelism for iteration phases
372
373
### Error Handling
374
375
All user-defined functions can throw exceptions that will be propagated and cause job failure:
376
377
```java
378
@Override
379
public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages, Collector<Double> out)
380
throws Exception {
381
382
if (vertex.getValue() < 0) {
383
throw new IllegalArgumentException("Negative vertex value: " + vertex.getValue());
384
}
385
// ... computation logic
386
}
387
```