0
# Pregel API
1
2
Vertex-centric programming framework for implementing custom iterative graph algorithms using the Pregel computational model. The Pregel API enables distributed graph computation through a message-passing paradigm where vertices receive messages, update their state, and send messages to neighbors.
3
4
## Capabilities
5
6
### Core Pregel Framework
7
8
The main Pregel computation API for implementing vertex-centric iterative algorithms.
9
10
```scala { .api }
11
/**
12
* Execute a Pregel computation on the graph
13
* @param initialMsg Initial message sent to all vertices in first iteration
14
* @param maxIterations Maximum number of iterations (default: no limit)
15
* @param activeDirection Edge direction for active vertices (default: Either)
16
* @param vprog Vertex program: (VertexId, VertexData, Message) => NewVertexData
17
* @param sendMsg Send message function: EdgeTriplet => Iterator[(VertexId, Message)]
18
* @param mergeMsg Merge messages function: (Message, Message) => Message
19
* @returns New graph with updated vertex attributes
20
*/
21
def pregel[A: ClassTag](
22
initialMsg: A,
23
maxIterations: Int = Int.MaxValue,
24
activeDirection: EdgeDirection = EdgeDirection.Either
25
)(
26
vprog: (VertexId, VD, A) => VD,
27
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
28
mergeMsg: (A, A) => A
29
): Graph[VD, ED]
30
31
object Pregel {
32
/**
33
* Execute Pregel computation (standalone object version)
34
* @param graph Input graph
35
* @param initialMsg Initial message for all vertices
36
* @param maxIterations Maximum iterations
37
* @param activeDirection Edge direction for message passing
38
* @param vprog Vertex program function
39
* @param sendMsg Message sending function
40
* @param mergeMsg Message merging function
41
* @returns Updated graph
42
*/
43
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](
44
graph: Graph[VD, ED],
45
initialMsg: A,
46
maxIterations: Int = Int.MaxValue,
47
activeDirection: EdgeDirection = EdgeDirection.Either
48
)(
49
vprog: (VertexId, VD, A) => VD,
50
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
51
mergeMsg: (A, A) => A
52
): Graph[VD, ED]
53
}
54
```
55
56
### Edge Direction Control
57
58
Control which edges are used for message passing based on vertex activity.
59
60
```scala { .api }
61
object EdgeDirection {
62
/** Edges where the vertex is the destination */
63
val In: EdgeDirection
64
65
/** Edges where the vertex is the source */
66
val Out: EdgeDirection
67
68
/** Both incoming and outgoing edges */
69
val Either: EdgeDirection
70
71
/** Only edges where vertex is both source AND destination (self-loops) */
72
val Both: EdgeDirection
73
}
74
75
class EdgeDirection {
76
/** Reverse the direction */
77
def reverse: EdgeDirection
78
}
79
```
80
81
### Message Aggregation Integration
82
83
The Pregel API builds on the lower-level `aggregateMessages` function for message passing.
84
85
```scala { .api }
86
/**
87
* Lower-level message aggregation (used internally by Pregel)
88
* @param sendMsg Function defining messages sent along edges
89
* @param mergeMsg Function combining multiple messages at same vertex
90
* @param tripletFields Optimization hint for which triplet fields are accessed
91
* @returns VertexRDD with aggregated messages
92
*/
93
def aggregateMessages[A: ClassTag](
94
sendMsg: EdgeContext[VD, ED, A] => Unit,
95
mergeMsg: (A, A) => A,
96
tripletFields: TripletFields = TripletFields.All
97
): VertexRDD[A]
98
99
/**
100
* Context for sending messages in aggregateMessages
101
*/
102
abstract class EdgeContext[VD, ED, A] {
103
val srcId: VertexId
104
val dstId: VertexId
105
val srcAttr: VD
106
val dstAttr: VD
107
val attr: ED
108
109
/** Send message to source vertex */
110
def sendToSrc(msg: A): Unit
111
112
/** Send message to destination vertex */
113
def sendToDst(msg: A): Unit
114
}
115
```
116
117
## Pregel Algorithm Patterns
118
119
### Single Source Shortest Path (SSSP)
120
121
Classic shortest path algorithm using Pregel message passing.
122
123
```scala
124
import org.apache.spark.graphx._
125
126
def shortestPaths(graph: Graph[Long, Double], sourceId: VertexId): Graph[Double, Double] = {
127
// Initialize distances: 0 for source, infinity for others
128
val initialGraph = graph.mapVertices((id, _) =>
129
if (id == sourceId) 0.0 else Double.PositiveInfinity
130
)
131
132
// Pregel computation
133
initialGraph.pregel(Double.PositiveInfinity)(
134
// Vertex program: update distance if received shorter path
135
vprog = (id, dist, newDist) => math.min(dist, newDist),
136
137
// Send message: if distance changed, notify neighbors
138
sendMsg = triplet => {
139
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
140
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
141
} else {
142
Iterator.empty
143
}
144
},
145
146
// Merge messages: take minimum distance
147
mergeMsg = (a, b) => math.min(a, b)
148
)
149
}
150
151
// Usage
152
val sourceVertex = 1L
153
val distances = shortestPaths(graph, sourceVertex).vertices
154
distances.collect.foreach { case (id, dist) =>
155
println(s"Distance from $sourceVertex to $id: $dist")
156
}
157
```
158
159
### Connected Components with Pregel
160
161
Find connected components using iterative label propagation.
162
163
```scala
164
def connectedComponents[ED: ClassTag](graph: Graph[Long, ED]): Graph[VertexId, ED] = {
165
// Initialize each vertex with its own ID as component label
166
val initialGraph = graph.mapVertices((id, _) => id)
167
168
initialGraph.pregel(Long.MaxValue)(
169
// Vertex program: adopt smaller component ID
170
vprog = (id, oldLabel, newLabel) => math.min(oldLabel, newLabel),
171
172
// Send message: propagate smallest seen component ID
173
sendMsg = triplet => {
174
val messages = mutable.ListBuffer[(VertexId, VertexId)]()
175
176
if (triplet.srcAttr < triplet.dstAttr) {
177
messages += ((triplet.dstId, triplet.srcAttr))
178
}
179
if (triplet.dstAttr < triplet.srcAttr) {
180
messages += ((triplet.srcId, triplet.dstAttr))
181
}
182
183
messages.toIterator
184
},
185
186
// Merge messages: take minimum component ID
187
mergeMsg = (a, b) => math.min(a, b)
188
)
189
}
190
```
191
192
### PageRank with Pregel
193
194
Implement PageRank algorithm using the Pregel framework.
195
196
```scala
197
def pageRank(graph: Graph[Double, Double], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {
198
// Initialize all vertices with rank 1.0
199
val initialGraph = graph.mapVertices((_, _) => 1.0)
200
201
// Get out-degrees for each vertex
202
val outDegrees = graph.outDegrees
203
val graphWithDegrees = initialGraph.outerJoinVertices(outDegrees)((id, rank, degOpt) =>
204
(rank, degOpt.getOrElse(0))
205
)
206
207
// Run Pregel for fixed iterations
208
graphWithDegrees.pregel((0.0, 0), numIter)(
209
// Vertex program: update PageRank score
210
vprog = (id, attr, msgSum) => {
211
val (oldRank, outDegree) = attr
212
val newRank = resetProb + (1.0 - resetProb) * msgSum
213
(newRank, outDegree)
214
},
215
216
// Send message: send rank contribution to neighbors
217
sendMsg = triplet => {
218
val (srcRank, srcOutDegree) = triplet.srcAttr
219
if (srcOutDegree > 0) {
220
Iterator((triplet.dstId, srcRank / srcOutDegree))
221
} else {
222
Iterator.empty
223
}
224
},
225
226
// Merge messages: sum all incoming rank contributions
227
mergeMsg = (a, b) => a + b
228
).mapVertices((id, attr) => attr._1) // Extract just the rank
229
}
230
```
231
232
### Collaborative Filtering with Pregel
233
234
Matrix factorization using alternating least squares implemented with Pregel.
235
236
```scala
237
case class Factor(features: Array[Double], bias: Double)
238
239
def alternatingLeastSquares(
240
graph: Graph[Double, Double], // ratings graph
241
rank: Int,
242
numIter: Int
243
): Graph[Factor, Double] = {
244
245
import scala.util.Random
246
val random = new Random(42)
247
248
// Initialize vertex features randomly
249
val initialGraph = graph.mapVertices { (id, _) =>
250
Factor(Array.fill(rank)(random.nextGaussian() * 0.1), 0.0)
251
}
252
253
// Alternate between updating user and item factors
254
var currentGraph = initialGraph
255
256
for (iter <- 0 until numIter) {
257
// Update user factors (vertices with ID < some threshold)
258
currentGraph = currentGraph.pregel(Factor(Array.empty, 0.0))(
259
vprog = (id, oldFactor, newFactor) => {
260
if (id < 1000000 && newFactor.features.nonEmpty) newFactor else oldFactor
261
},
262
263
sendMsg = triplet => {
264
val rating = triplet.attr
265
// Send item factors to users, user factors to items
266
if (triplet.srcId < 1000000) { // User vertex
267
Iterator((triplet.srcId, triplet.dstAttr)) // Send item factor to user
268
} else {
269
Iterator((triplet.dstId, triplet.srcAttr)) // Send user factor to item
270
}
271
},
272
273
mergeMsg = (f1, f2) => f1 // Simple merge (would need proper ALS update)
274
)
275
276
// Update item factors (similar pattern)
277
// ... item factor update iteration ...
278
}
279
280
currentGraph
281
}
282
```
283
284
## Advanced Pregel Patterns
285
286
### Multi-Phase Algorithms
287
288
Some algorithms require multiple Pregel phases with different logic.
289
290
```scala
291
def twoPhaseAlgorithm[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VD, ED] = {
292
// Phase 1: Forward pass
293
val phase1Result = graph.pregel(initialMsg1)(vprog1, sendMsg1, mergeMsg1)
294
295
// Phase 2: Backward pass with different logic
296
val phase2Result = phase1Result.pregel(initialMsg2)(vprog2, sendMsg2, mergeMsg2)
297
298
phase2Result
299
}
300
```
301
302
### Convergence Detection
303
304
Implement custom convergence checking within Pregel iterations.
305
306
```scala
307
def convergedPregelAlgorithm[VD: ClassTag, ED: ClassTag](
308
graph: Graph[VD, ED],
309
tolerance: Double
310
): Graph[VD, ED] = {
311
312
var currentGraph = graph
313
var converged = false
314
var iteration = 0
315
316
while (!converged && iteration < 100) {
317
val previousGraph = currentGraph
318
319
currentGraph = currentGraph.pregel(initialMsg)(
320
vprog = (id, oldAttr, msg) => {
321
// Update logic that tracks changes
322
val newAttr = updateFunction(oldAttr, msg)
323
newAttr
324
},
325
sendMsg = sendFunction,
326
mergeMsg = mergeFunction
327
)
328
329
// Check convergence by comparing vertex attributes
330
val maxChange = previousGraph.vertices
331
.join(currentGraph.vertices)
332
.map { case (id, (oldAttr, newAttr)) =>
333
computeChange(oldAttr, newAttr)
334
}
335
.max()
336
337
converged = maxChange < tolerance
338
previousGraph.unpersist(blocking = false)
339
iteration += 1
340
}
341
342
currentGraph
343
}
344
```
345
346
### Performance Optimization Patterns
347
348
```scala
349
// Optimize Pregel with proper caching and partitioning
350
def optimizedPregelAlgorithm[VD: ClassTag, ED: ClassTag](
351
graph: Graph[VD, ED]
352
): Graph[VD, ED] = {
353
354
val optimizedGraph = graph
355
.partitionBy(PartitionStrategy.EdgePartition2D) // Better partitioning
356
.cache() // Cache for iterations
357
358
val result = optimizedGraph.pregel(
359
initialMsg = initialMessage,
360
maxIterations = 50 // Prevent infinite loops
361
)(
362
vprog = vertexProgram,
363
sendMsg = messageFunction,
364
mergeMsg = mergeFunction
365
)
366
367
// Clean up
368
optimizedGraph.unpersist(blocking = false)
369
result.cache() // Cache result if it will be reused
370
}
371
372
// Use TripletFields for better performance
373
def efficientMessagePassing[VD: ClassTag, ED: ClassTag](
374
graph: Graph[VD, ED]
375
): VertexRDD[Double] = {
376
377
graph.aggregateMessages[Double](
378
sendMsg = ctx => {
379
// Only access needed fields
380
ctx.sendToDst(ctx.srcAttr)
381
},
382
mergeMsg = (a, b) => a + b,
383
tripletFields = TripletFields.Src // Only need source attributes
384
)
385
}
386
```
387
388
## Pregel vs. Other GraphX Operations
389
390
### When to Use Pregel
391
392
- **Iterative algorithms** that require multiple passes through the graph
393
- **Custom algorithms** not available in the GraphX library
394
- **Vertex-centric computation** where logic is naturally expressed per-vertex
395
- **Message-passing patterns** where vertices communicate with neighbors
396
397
### When to Use Alternatives
398
399
```scala
400
// Use aggregateMessages for single-pass message aggregation
401
val degrees = graph.aggregateMessages[Int](
402
sendMsg = ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) },
403
mergeMsg = (a, b) => a + b
404
)
405
406
// Use mapVertices/mapTriplets for simple transformations
407
val normalizedGraph = graph.mapVertices((id, attr) => attr / maxValue)
408
409
// Use GraphOps methods for common operations
410
val components = graph.connectedComponents() // More efficient than custom Pregel
411
val pageRanks = graph.pageRank(0.001) // Optimized implementation
412
```
413
414
### Pregel Execution Model
415
416
```scala
417
// Pregel execution phases in each iteration:
418
// 1. Vertex Program: Update vertex state based on received messages
419
// 2. Send Messages: Generate messages for next iteration
420
// 3. Message Aggregation: Combine multiple messages to same vertex
421
// 4. Check Convergence: Determine if more iterations needed
422
423
def pregelIteration[VD, ED, A](
424
graph: Graph[VD, ED],
425
messages: VertexRDD[A],
426
vprog: (VertexId, VD, A) => VD,
427
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
428
mergeMsg: (A, A) => A
429
): (Graph[VD, ED], VertexRDD[A]) = {
430
431
// Phase 1: Apply vertex program
432
val newVertices = graph.vertices.leftJoin(messages)(vprog)
433
val newGraph = Graph(newVertices, graph.edges)
434
435
// Phase 2: Send messages for next iteration
436
val newMessages = newGraph.aggregateMessages(
437
sendMsg = ctx => sendMsg(ctx.toEdgeTriplet).foreach {
438
case (vid, msg) => if (vid == ctx.srcId) ctx.sendToSrc(msg) else ctx.sendToDst(msg)
439
},
440
mergeMsg = mergeMsg
441
)
442
443
(newGraph, newMessages)
444
}
445
```