0
# Graph Processing
1
2
GraphX provides APIs for graphs and graph-parallel computation with fundamental operators like subgraph, joinVertices, and aggregateMessages, plus optimized variants of Pregel and graph algorithms including PageRank, connected components, and triangle counting.
3
4
## Capabilities
5
6
### Graph[VD, ED]
7
8
Main graph abstraction representing a directed multigraph with user-defined vertex and edge attributes.
9
10
```scala { .api }
11
/**
12
* Main graph abstraction with vertex and edge attributes
13
* @tparam VD vertex attribute type
14
* @tparam ED edge attribute type
15
*/
16
abstract class Graph[VD: ClassTag, ED: ClassTag] {
17
/** Graph vertices with attributes */
18
val vertices: VertexRDD[VD]
19
/** Graph edges with attributes */
20
val edges: EdgeRDD[ED]
21
/** Edge triplets (src vertex, edge, dst vertex) */
22
val triplets: RDD[EdgeTriplet[VD, ED]]
23
24
/** Transform vertex attributes */
25
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
26
/** Transform edge attributes */
27
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
28
/** Transform edge attributes using triplet information */
29
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
30
31
/** Reverse edge directions */
32
def reverse: Graph[VD, ED]
33
/** Extract subgraph based on edge and vertex predicates */
34
def subgraph(
35
epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
36
vpred: (VertexId, VD) => Boolean = ((v, d) => true)
37
): Graph[VD, ED]
38
39
/** Join vertices with external data */
40
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
41
/** Update vertex attributes using aggregated messages from neighbors */
42
def aggregateMessages[A: ClassTag](
43
sendMsg: EdgeContext[VD, ED, A] => Unit,
44
mergeMsg: (A, A) => A,
45
tripletFields: TripletFields = TripletFields.All
46
): VertexRDD[A]
47
48
/** Number of vertices */
49
def numVertices: Long
50
/** Number of edges */
51
def numEdges: Long
52
/** In-degrees of vertices */
53
def inDegrees: VertexRDD[Int]
54
/** Out-degrees of vertices */
55
def outDegrees: VertexRDD[Int]
56
/** Total degrees of vertices */
57
def degrees: VertexRDD[Int]
58
59
/** Persist graph in memory/disk */
60
def persist(newLevel: StorageLevel): Graph[VD, ED]
61
/** Cache graph in memory */
62
def cache(): Graph[VD, ED]
63
/** Remove graph from cache */
64
def unpersist(blocking: Boolean = false): Graph[VD, ED]
65
}
66
```
67
68
**Usage Examples:**
69
70
```scala
71
import org.apache.spark.graphx._
72
73
// Create graph from vertex and edge RDDs
74
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
75
(1L, "Alice"), (2L, "Bob"), (3L, "Charlie"), (4L, "David")
76
))
77
78
val edges: RDD[Edge[String]] = sc.parallelize(Array(
79
Edge(1L, 2L, "friend"),
80
Edge(2L, 3L, "follow"),
81
Edge(3L, 4L, "friend")
82
))
83
84
val graph = Graph(vertices, edges)
85
86
// Transform vertices
87
val upperGraph = graph.mapVertices((id, name) => name.toUpperCase)
88
89
// Filter subgraph
90
val youngUsers = graph.subgraph(vpred = (id, name) => name.length > 3)
91
92
// Aggregate messages (compute degrees manually)
93
val degrees = graph.aggregateMessages[Int](
94
triplet => {
95
triplet.sendToSrc(1)
96
triplet.sendToDst(1)
97
},
98
(a, b) => a + b
99
)
100
```
101
102
### Graph Construction
103
104
Factory methods and utilities for creating graphs.
105
106
```scala { .api }
107
/**
108
* Graph object with factory methods
109
*/
110
object Graph {
111
/** Create graph from vertex and edge RDDs */
112
def apply[VD: ClassTag, ED: ClassTag](
113
vertices: RDD[(VertexId, VD)],
114
edges: RDD[Edge[ED]],
115
defaultVertexAttr: VD
116
): Graph[VD, ED]
117
118
/** Create graph from edges with default vertex attributes */
119
def fromEdges[VD: ClassTag, ED: ClassTag](
120
edges: RDD[Edge[ED]],
121
defaultValue: VD
122
): Graph[VD, ED]
123
124
/** Create graph from edge tuples */
125
def fromEdgeTuples[VD: ClassTag](
126
rawEdges: RDD[(VertexId, VertexId)],
127
defaultValue: VD,
128
uniqueEdges: Option[PartitionStrategy] = None
129
): Graph[VD, Int]
130
}
131
132
/**
133
* GraphLoader provides utilities for loading graphs from files
134
*/
135
object GraphLoader {
136
/** Load graph from edge list file */
137
def edgeListFile(
138
sc: SparkContext,
139
path: String,
140
canonicalOrientation: Boolean = false,
141
numEdgePartitions: Int = -1,
142
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
143
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
144
): Graph[Int, Int]
145
}
146
```
147
148
**Usage Examples:**
149
150
```scala
151
// Load from edge list file
152
val graph = GraphLoader.edgeListFile(sc, "hdfs://path/to/edges.txt")
153
154
// Create from edge tuples
155
val edgeTuples = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L)))
156
val tupleGraph = Graph.fromEdgeTuples(edgeTuples, "defaultVertex")
157
158
// Create from edges only
159
val edgeList = sc.parallelize(Array(
160
Edge(1L, 2L, 1.0),
161
Edge(2L, 3L, 2.0)
162
))
163
val edgeGraph = Graph.fromEdges(edgeList, "missing")
164
```
165
166
### Core Types
167
168
Fundamental types used in graph processing.
169
170
```scala { .api }
171
/** Vertex identifier type */
172
type VertexId = Long
173
174
/**
175
* Edge with source, destination, and attribute
176
*/
177
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED)
178
179
/**
180
* Edge with source and destination vertex attributes
181
*/
182
class EdgeTriplet[VD, ED] extends Edge[ED] {
183
/** Source vertex attribute */
184
var srcAttr: VD = _
185
/** Destination vertex attribute */
186
var dstAttr: VD = _
187
188
/** Set source vertex attribute */
189
def set(other: Edge[ED], srcAttr: VD, dstAttr: VD): EdgeTriplet[VD, ED]
190
}
191
192
/**
193
* Context for sending messages in aggregateMessages
194
*/
195
abstract class EdgeContext[VD, ED, A] {
196
/** Source vertex ID */
197
def srcId: VertexId
198
/** Destination vertex ID */
199
def dstId: VertexId
200
/** Source vertex attribute */
201
def srcAttr: VD
202
/** Destination vertex attribute */
203
def dstAttr: VD
204
/** Edge attribute */
205
def attr: ED
206
207
/** Send message to source vertex */
208
def sendToSrc(msg: A): Unit
209
/** Send message to destination vertex */
210
def sendToDst(msg: A): Unit
211
}
212
```
213
214
### VertexRDD[VD]
215
216
Specialized RDD for vertices with efficient joins and graph operations.
217
218
```scala { .api }
219
/**
220
* RDD of vertices with efficient joins
221
*/
222
abstract class VertexRDD[VD: ClassTag] extends RDD[(VertexId, VD)] {
223
/** Filter vertices by predicate */
224
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
225
def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]
226
227
/** Transform vertex values */
228
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
229
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
230
231
/** Vertices in this RDD but not in other */
232
def diff(other: VertexRDD[VD]): VertexRDD[VD]
233
234
/** Left join with another RDD */
235
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(
236
f: (VertexId, VD, Option[VD2]) => VD3
237
): VertexRDD[VD3]
238
239
/** Inner join with another RDD */
240
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
241
f: (VertexId, VD, U) => VD2
242
): VertexRDD[VD2]
243
244
/** Aggregate values by vertex ID */
245
def aggregateUsingIndex[VD2: ClassTag](
246
messages: RDD[(VertexId, VD2)],
247
reduceFunc: (VD2, VD2) => VD2
248
): VertexRDD[VD2]
249
}
250
```
251
252
### EdgeRDD[ED]
253
254
Specialized RDD for edges with graph-specific optimizations.
255
256
```scala { .api }
257
/**
258
* RDD of edges with graph-specific optimizations
259
*/
260
abstract class EdgeRDD[ED: ClassTag] extends RDD[Edge[ED]] {
261
/** Transform edge values */
262
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
263
264
/** Reverse edge directions */
265
def reverse: EdgeRDD[ED]
266
267
/** Filter edges using triplet information */
268
def filter(pred: EdgeTriplet[_, ED] => Boolean): EdgeRDD[ED]
269
270
/** Join with vertex attributes to create triplets */
271
def innerJoin[VD: ClassTag, ED2: ClassTag](other: VertexRDD[VD])(
272
f: (VertexId, VertexId, ED, VD, VD) => ED2
273
): EdgeRDD[ED2]
274
}
275
```
276
277
### Graph Algorithms
278
279
Pre-implemented graph algorithms available through GraphOps.
280
281
```scala { .api }
282
/**
283
* Additional operations available on Graph through implicit conversion
284
*/
285
implicit class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
286
/** Run PageRank algorithm */
287
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
288
289
/** Run static PageRank for fixed number of iterations */
290
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
291
292
/** Find connected components */
293
def connectedComponents(): Graph[VertexId, ED]
294
295
/** Count triangles passing through each vertex */
296
def triangleCount(): Graph[Int, ED]
297
298
/** Find strongly connected components */
299
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
300
301
/** Collect neighbor IDs for each vertex */
302
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
303
304
/** Collect neighbor attributes for each vertex */
305
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
306
}
307
308
/**
309
* Edge directions for neighbor collection
310
*/
311
object EdgeDirection extends Enumeration {
312
val In, Out, Either, Both = Value
313
}
314
```
315
316
**Usage Examples:**
317
318
```scala
319
import org.apache.spark.graphx.lib._
320
321
// PageRank
322
val pageRankGraph = graph.pageRank(0.0001, 0.15)
323
val pageRanks = pageRankGraph.vertices.collect()
324
325
// Connected Components
326
val ccGraph = graph.connectedComponents()
327
val components = ccGraph.vertices.collect()
328
329
// Triangle Count
330
val triangleCountGraph = graph.triangleCount()
331
val triangleCounts = triangleCountGraph.vertices.collect()
332
333
// Collect neighbors
334
val neighbors = graph.collectNeighborIds(EdgeDirection.Out)
335
```
336
337
### Pregel API
338
339
Pregel-style bulk-synchronous message-passing abstraction.
340
341
```scala { .api }
342
/**
343
* Pregel-style computation
344
*/
345
object Pregel {
346
/** Run Pregel computation */
347
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](
348
graph: Graph[VD, ED],
349
initialMsg: A,
350
maxIterations: Int = Int.MaxValue,
351
activeDirection: EdgeDirection = EdgeDirection.Either
352
)(
353
vprog: (VertexId, VD, A) => VD,
354
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
355
mergeMsg: (A, A) => A
356
): Graph[VD, ED]
357
}
358
```
359
360
**Usage Examples:**
361
362
```scala
363
// Single Source Shortest Path using Pregel
364
def shortestPaths(graph: Graph[Double, Double], sourceId: VertexId): Graph[Double, Double] = {
365
val initialGraph = graph.mapVertices((id, _) =>
366
if (id == sourceId) 0.0 else Double.PositiveInfinity
367
)
368
369
Pregel(initialGraph, Double.PositiveInfinity)(
370
// Vertex program: update vertex value with minimum distance
371
(id, dist, newDist) => math.min(dist, newDist),
372
// Send message: send distance + edge weight to neighbors
373
triplet => {
374
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
375
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
376
} else {
377
Iterator.empty
378
}
379
},
380
// Merge messages: take minimum
381
(a, b) => math.min(a, b)
382
)
383
}
384
```
385
386
### Graph Partitioning
387
388
Strategies for partitioning graphs across cluster nodes.
389
390
```scala { .api }
391
/**
392
* Partitioning strategies for graph distribution
393
*/
394
object PartitionStrategy extends Enumeration {
395
/** Randomly assign edges to partitions */
396
val RandomVertexCut = Value
397
/** Assign edges based on source vertex hash */
398
val EdgePartition1D = Value
399
/** Two-dimensional partitioning */
400
val EdgePartition2D = Value
401
/** Canonical random vertex cut */
402
val CanonicalRandomVertexCut = Value
403
}
404
405
/**
406
* Partition graphs efficiently
407
*/
408
implicit class GraphPartitioning[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
409
/** Repartition graph using specified strategy */
410
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
411
def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
412
}
413
```
414
415
### Graph I/O
416
417
Loading and saving graphs from various formats.
418
419
```scala { .api }
420
/**
421
* Save graph to various formats
422
*/
423
implicit class GraphWriter[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
424
/** Save vertices to text file */
425
def saveVerticesAsTextFile(path: String): Unit
426
/** Save edges to text file */
427
def saveEdgesAsTextFile(path: String): Unit
428
}
429
```
430
431
**Usage Examples:**
432
433
```scala
434
// Partition graph
435
val partitionedGraph = graph
436
.partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 4)
437
438
// Save graph
439
graph.vertices.saveAsTextFile("hdfs://path/to/vertices")
440
graph.edges.saveAsTextFile("hdfs://path/to/edges")
441
442
// Complex graph analysis pipeline
443
val socialGraph = Graph.fromEdgeTuples(friendships, "User")
444
445
val pageRanks = socialGraph
446
.pageRank(0.0001)
447
.vertices
448
.sortBy(_._2, ascending = false)
449
450
val communities = socialGraph
451
.connectedComponents()
452
.vertices
453
.map { case (userId, componentId) => (componentId, userId) }
454
.groupByKey()
455
.collect()
456
```
457
458
## Error Handling
459
460
Common GraphX exceptions:
461
462
- `IllegalArgumentException` - Invalid graph construction parameters
463
- `SparkException` - General Spark execution errors during graph operations
464
- `ClassCastException` - Type mismatches in vertex/edge attributes