0
# Message Passing and Pregel
1
2
Bulk synchronous message passing framework for implementing custom graph algorithms using vertex-centric programming model with efficient distributed computation.
3
4
## Capabilities
5
6
### Core Message Aggregation
7
8
The fundamental message passing operation that enables vertex-centric computation patterns.
9
10
```scala { .api }
11
/**
12
* Aggregate messages sent along edges to compute new vertex values
13
* Core operation for implementing graph algorithms using message passing
14
* @param sendMsg Function defining what messages to send along each edge
15
* @param mergeMsg Function to combine multiple messages received at same vertex
16
* @param tripletFields Optional field specification for performance optimization
17
* @return VertexRDD containing aggregated messages for each vertex
18
*/
19
def aggregateMessages[A: ClassTag](
20
sendMsg: EdgeContext[VD, ED, A] => Unit,
21
mergeMsg: (A, A) => A,
22
tripletFields: TripletFields = TripletFields.All
23
): VertexRDD[A]
24
```
25
26
**Usage Examples:**
27
28
```scala
29
import org.apache.spark.graphx._
30
31
// Count neighbors for each vertex
32
val neighborCounts = graph.aggregateMessages[Int](
33
// Send message function - send 1 to both source and destination
34
triplet => {
35
triplet.sendToSrc(1)
36
triplet.sendToDst(1)
37
},
38
// Merge function - sum all incoming messages
39
(a, b) => a + b
40
)
41
42
// Collect neighbor attributes
43
val neighborAttributes = graph.aggregateMessages[List[String]](
44
triplet => {
45
// Send destination attribute to source, source attribute to destination
46
triplet.sendToSrc(List(triplet.dstAttr))
47
triplet.sendToDst(List(triplet.srcAttr))
48
},
49
// Merge by concatenating lists
50
(a, b) => a ++ b
51
)
52
53
// Calculate weighted sums (assuming numeric edge attributes)
54
val weightedSums = graph.aggregateMessages[Double](
55
triplet => {
56
val weight = triplet.attr.toString.toDouble
57
triplet.sendToSrc(weight)
58
triplet.sendToDst(weight)
59
},
60
(a, b) => a + b
61
)
62
```
63
64
### EdgeContext API
65
66
Context object providing access to edge and vertex data during message sending.
67
68
```scala { .api }
69
/**
70
* Context for sending messages during aggregateMessages operation
71
* Provides access to edge data and adjacent vertex attributes
72
*/
73
abstract class EdgeContext[VD, ED, A] {
74
/** Source vertex ID */
75
def srcId: VertexId
76
77
/** Destination vertex ID */
78
def dstId: VertexId
79
80
/** Source vertex attribute */
81
def srcAttr: VD
82
83
/** Destination vertex attribute */
84
def dstAttr: VD
85
86
/** Edge attribute */
87
def attr: ED
88
89
/** Send message to source vertex */
90
def sendToSrc(msg: A): Unit
91
92
/** Send message to destination vertex */
93
def sendToDst(msg: A): Unit
94
95
/** Convert to EdgeTriplet for compatibility with other APIs */
96
def toEdgeTriplet: EdgeTriplet[VD, ED]
97
}
98
```
99
100
**Usage Examples:**
101
102
```scala
103
// Complex message passing using all context data
104
val complexMessages = graph.aggregateMessages[(String, Double, Int)](
105
triplet => {
106
// Access all parts of the context
107
val edgeInfo = s"${triplet.srcAttr}-${triplet.attr}-${triplet.dstAttr}"
108
val edgeWeight = triplet.attr.toString.length.toDouble
109
val edgeCount = 1
110
111
// Send different information to each end
112
triplet.sendToSrc((s"From dst ${triplet.dstId}: $edgeInfo", edgeWeight, edgeCount))
113
triplet.sendToDst((s"From src ${triplet.srcId}: $edgeInfo", edgeWeight, edgeCount))
114
},
115
// Merge messages by combining fields
116
(a, b) => (s"${a._1}; ${b._1}", a._2 + b._2, a._3 + b._3)
117
)
118
```
119
120
### Pregel Framework
121
122
Implementation of the Pregel bulk synchronous parallel model for vertex-centric graph computation.
123
124
```scala { .api }
125
/**
126
* Pregel bulk synchronous parallel computation framework
127
* Iteratively applies vertex program and sends messages until convergence
128
*/
129
object Pregel {
130
/**
131
* Run Pregel computation
132
* @param graph Initial graph
133
* @param initialMsg Message sent to all vertices in first iteration
134
* @param maxIterations Maximum number of iterations (default unlimited)
135
* @param activeDirection Direction of edges that can send messages
136
* @param vprog Vertex program - how vertices update based on messages
137
* @param sendMsg Edge program - what messages to send along active edges
138
* @param mergeMsg Message merging function for multiple messages to same vertex
139
* @return Final graph after computation converges or max iterations reached
140
*/
141
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](
142
graph: Graph[VD, ED],
143
initialMsg: A,
144
maxIterations: Int = Int.MaxValue,
145
activeDirection: EdgeDirection = EdgeDirection.Either
146
)(
147
vprog: (VertexId, VD, A) => VD,
148
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
149
mergeMsg: (A, A) => A
150
): Graph[VD, ED]
151
}
152
```
153
154
**Usage Examples:**
155
156
```scala
157
// Single Source Shortest Path using Pregel
158
def shortestPaths(graph: Graph[_, Double], sourceId: VertexId): Graph[Double, Double] = {
159
// Initialize distances: 0 for source, infinity for others
160
val initialGraph = graph.mapVertices((id, _) =>
161
if (id == sourceId) 0.0 else Double.PositiveInfinity
162
)
163
164
Pregel(
165
initialGraph,
166
initialMsg = Double.PositiveInfinity,
167
maxIterations = 30,
168
activeDirection = EdgeDirection.Out
169
)(
170
// Vertex program: update distance if better path found
171
vprog = (id, dist, newDist) => math.min(dist, newDist),
172
173
// Send message: if vertex has finite distance, send distance + edge weight
174
sendMsg = triplet => {
175
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
176
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
177
} else {
178
Iterator.empty
179
}
180
},
181
182
// Merge messages: take minimum distance
183
mergeMsg = (a, b) => math.min(a, b)
184
)
185
}
186
187
// Connected Components using Pregel
188
def connectedComponents[VD: ClassTag, ED: ClassTag](
189
graph: Graph[VD, ED]
190
): Graph[VertexId, ED] = {
191
// Initialize each vertex with its own ID as component
192
val initialGraph = graph.mapVertices((id, _) => id)
193
194
Pregel(
195
initialGraph,
196
initialMsg = Long.MaxValue,
197
activeDirection = EdgeDirection.Either
198
)(
199
// Vertex program: adopt smaller component ID
200
vprog = (id, oldComp, newComp) => math.min(oldComp, newComp),
201
202
// Send message: send component ID to neighbors if it would reduce their component
203
sendMsg = triplet => {
204
val messages = mutable.ListBuffer.empty[(VertexId, VertexId)]
205
206
if (triplet.srcAttr < triplet.dstAttr) {
207
messages += ((triplet.dstId, triplet.srcAttr))
208
}
209
if (triplet.dstAttr < triplet.srcAttr) {
210
messages += ((triplet.srcId, triplet.dstAttr))
211
}
212
213
messages.iterator
214
},
215
216
// Merge messages: take minimum component ID
217
mergeMsg = (a, b) => math.min(a, b)
218
)
219
}
220
```
221
222
### Advanced Message Passing Patterns
223
224
Complex message passing patterns for sophisticated algorithms.
225
226
```scala { .api }
227
// Pattern: Multi-phase message passing
228
def multiPhaseAlgorithm[VD: ClassTag, ED: ClassTag](
229
graph: Graph[VD, ED]
230
): Graph[(VD, Map[String, Double]), ED] = {
231
232
// Phase 1: Collect local neighborhood information
233
val localInfo = graph.aggregateMessages[Map[String, Double]](
234
triplet => {
235
val info = Map("degree" -> 1.0, "edgeWeight" -> triplet.attr.toString.toDouble)
236
triplet.sendToSrc(info)
237
triplet.sendToDst(info)
238
},
239
(a, b) => a ++ b.map { case (k, v) => k -> (v + a.getOrElse(k, 0.0)) }
240
)
241
242
// Phase 2: Propagate aggregated information
243
val enrichedGraph = graph.outerJoinVertices(localInfo) { (id, attr, infoOpt) =>
244
(attr, infoOpt.getOrElse(Map.empty))
245
}
246
247
val globalInfo = enrichedGraph.aggregateMessages[Map[String, Double]](
248
triplet => {
249
// Send processed local information to neighbors
250
val processedInfo = triplet.srcAttr._2.map { case (k, v) => k -> v / 2.0 }
251
triplet.sendToDst(processedInfo)
252
},
253
(a, b) => a ++ b.map { case (k, v) => k -> (v + a.getOrElse(k, 0.0)) }
254
)
255
256
enrichedGraph.outerJoinVertices(globalInfo) { (id, (attr, local), globalOpt) =>
257
(attr, local ++ globalOpt.getOrElse(Map.empty))
258
}
259
}
260
```
261
262
### Performance Optimizations
263
264
Optimizations for message passing operations using TripletFields.
265
266
```scala { .api }
267
/**
268
* TripletFields specification for optimizing message passing performance
269
* Indicates which fields of EdgeTriplet/EdgeContext are accessed
270
*/
271
class TripletFields(val useSrc: Boolean, val useDst: Boolean, val useEdge: Boolean)
272
273
object TripletFields {
274
/** No fields accessed - most efficient */
275
val None = new TripletFields(false, false, false)
276
277
/** Only edge attribute accessed */
278
val EdgeOnly = new TripletFields(false, false, true)
279
280
/** Source vertex and edge attributes accessed */
281
val Src = new TripletFields(true, false, true)
282
283
/** Destination vertex and edge attributes accessed */
284
val Dst = new TripletFields(false, true, true)
285
286
/** All fields accessed - default but least efficient */
287
val All = new TripletFields(true, true, true)
288
}
289
```
290
291
**Usage Examples:**
292
293
```scala
294
// Optimize message passing by specifying required fields
295
val optimizedCount = graph.aggregateMessages[Int](
296
triplet => {
297
// Only need to count, don't access any attributes
298
triplet.sendToSrc(1)
299
triplet.sendToDst(1)
300
},
301
(a, b) => a + b,
302
TripletFields.None // Most efficient - no attribute access needed
303
)
304
305
val edgeWeightSum = graph.aggregateMessages[Double](
306
triplet => {
307
// Only need edge attribute for weight
308
val weight = triplet.attr.toString.toDouble
309
triplet.sendToSrc(weight)
310
},
311
(a, b) => a + b,
312
TripletFields.EdgeOnly // Only edge attributes needed
313
)
314
315
val sourceAttributeCollection = graph.aggregateMessages[List[String]](
316
triplet => {
317
// Need source attribute and edge, but not destination
318
triplet.sendToDst(List(s"${triplet.srcAttr}-${triplet.attr}"))
319
},
320
(a, b) => a ++ b,
321
TripletFields.Src // Source and edge fields needed
322
)
323
```
324
325
### Message Passing Best Practices
326
327
Patterns and practices for efficient message passing algorithms.
328
329
```scala
330
// Best Practice 1: Minimize message size
331
val efficientMessages = graph.aggregateMessages[Int](
332
triplet => {
333
// Send small messages - use IDs instead of full objects
334
triplet.sendToSrc(triplet.dstId.toInt % 1000) // Small derived value
335
},
336
(a, b) => a + b
337
)
338
339
// Best Practice 2: Use appropriate data structures for messages
340
val setMessages = graph.aggregateMessages[Set[VertexId]](
341
triplet => {
342
triplet.sendToSrc(Set(triplet.dstId))
343
},
344
(a, b) => a ++ b // Set union is efficient for deduplication
345
)
346
347
// Best Practice 3: Conditional message sending
348
val conditionalMessages = graph.aggregateMessages[Double](
349
triplet => {
350
// Only send messages when necessary
351
if (triplet.srcAttr.toString.length > triplet.dstAttr.toString.length) {
352
triplet.sendToDst(triplet.srcAttr.toString.length.toDouble)
353
}
354
},
355
(a, b) => math.max(a, b)
356
)
357
358
// Best Practice 4: Batch processing pattern
359
def iterativeMessagePassing[VD: ClassTag](
360
initialGraph: Graph[VD, _],
361
maxIter: Int
362
): Graph[VD, _] = {
363
var graph = initialGraph
364
var iteration = 0
365
366
while (iteration < maxIter) {
367
val messages = graph.aggregateMessages[VD](
368
triplet => {
369
// Algorithm-specific message sending
370
triplet.sendToSrc(triplet.dstAttr)
371
},
372
(a, b) => a // Algorithm-specific merging
373
)
374
375
// Check for convergence
376
if (messages.count() == 0) {
377
println(s"Converged after $iteration iterations")
378
return graph
379
}
380
381
// Update graph with new values
382
graph = graph.outerJoinVertices(messages) { (id, oldAttr, msgOpt) =>
383
msgOpt.getOrElse(oldAttr)
384
}
385
386
iteration += 1
387
}
388
389
graph
390
}
391
```
392
393
## Integration with GraphOps
394
395
Message passing operations are seamlessly integrated with GraphOps for convenient access.
396
397
```scala
398
// Direct access through GraphOps
399
val messages = graph.aggregateMessages[String](
400
triplet => triplet.sendToSrc(triplet.dstAttr),
401
(a, b) => s"$a,$b"
402
)
403
404
// Pregel access through GraphOps
405
val pregelResult = graph.pregel("initial")(
406
(id, attr, msg) => s"$attr-$msg",
407
triplet => Iterator((triplet.dstId, triplet.srcAttr)),
408
(a, b) => s"$a;$b"
409
)
410
```