0
# Graph Processing
1
2
Spark GraphX is a graph computation framework built on top of Spark Core. It provides APIs for expressing graph computation that can model the user-defined graphs by using the property graph abstraction.
3
4
## Graph
5
6
The fundamental abstraction in GraphX is the property graph: a directed multigraph with properties attached to each vertex and edge.
7
8
```scala { .api }
9
abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializable {
10
// Basic graph properties
11
def vertices: VertexRDD[VD]
12
def edges: EdgeRDD[ED]
13
def triplets: RDD[EdgeTriplet[VD, ED]]
14
15
// Persistence operations
16
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
17
def cache(): Graph[VD, ED]
18
def unpersist(blocking: Boolean = false): Graph[VD, ED]
19
def checkpoint(): Unit
20
def isCheckpointed: Boolean
21
def getCheckpointFiles: Seq[String]
22
23
// Structural operations
24
def numEdges: Long
25
def numVertices: Long
26
def inDegrees: VertexRDD[Int]
27
def outDegrees: VertexRDD[Int]
28
def degrees: VertexRDD[Int]
29
30
// Graph transformations
31
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
32
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
33
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
34
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2,
35
tripletFields: TripletFields): Graph[VD, ED2]
36
37
// Structural transformations
38
def reverse: Graph[VD, ED]
39
def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
40
vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]
41
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
42
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
43
44
// Neighborhood operations
45
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
46
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
47
def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit,
48
mergeMsg: (A, A) => A,
49
tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
50
def aggregateMessagesWithActiveSet[A: ClassTag](
51
sendMsg: EdgeContext[VD, ED, A] => Unit,
52
mergeMsg: (A, A) => A,
53
tripletFields: TripletFields,
54
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A]
55
56
// Join operations
57
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
58
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
59
(mapFunc: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
60
61
// Partitioning
62
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
63
def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
64
65
// Pregel API
66
def pregel[A: ClassTag](initialMsg: A,
67
maxIterations: Int = Int.MaxValue,
68
activeDirection: EdgeDirection = EdgeDirection.Either)
69
(vprog: (VertexId, VD, A) => VD,
70
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
71
mergeMsg: (A, A) => A): Graph[VD, ED]
72
73
// Graph algorithms
74
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
75
def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
76
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
77
def connectedComponents(): Graph[VertexId, ED]
78
def connectedComponents(maxIterations: Int): Graph[VertexId, ED]
79
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
80
def triangleCount(): Graph[Int, ED]
81
}
82
```
83
84
### Graph Companion Object
85
86
```scala { .api }
87
object Graph {
88
def apply[VD: ClassTag, ED: ClassTag](
89
vertices: RDD[(VertexId, VD)],
90
edges: RDD[Edge[ED]],
91
defaultVertexAttr: VD = null.asInstanceOf[VD],
92
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
93
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
94
95
def fromEdges[VD: ClassTag, ED: ClassTag](
96
edges: RDD[Edge[ED]],
97
defaultValue: VD,
98
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
99
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
100
101
def fromEdgeTuples[VD: ClassTag](
102
rawEdges: RDD[(VertexId, VertexId)],
103
defaultValue: VD,
104
uniqueEdges: Option[PartitionStrategy] = None,
105
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
106
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]
107
108
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]): GraphOps[VD, ED]
109
}
110
```
111
112
### Usage Examples
113
114
```scala
115
import org.apache.spark.graphx._
116
import org.apache.spark.rdd.RDD
117
118
// Create vertices RDD
119
val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array(
120
(3L, ("rxin", "student")),
121
(7L, ("jgonzal", "postdoc")),
122
(5L, ("franklin", "prof")),
123
(2L, ("istoica", "prof"))
124
))
125
126
// Create edges RDD
127
val relationships: RDD[Edge[String]] = sc.parallelize(Array(
128
Edge(3L, 7L, "collab"),
129
Edge(5L, 3L, "advisor"),
130
Edge(2L, 5L, "colleague"),
131
Edge(5L, 7L, "pi")
132
))
133
134
// Build the graph
135
val graph = Graph(users, relationships)
136
137
// Basic operations
138
println(s"Number of vertices: ${graph.numVertices}")
139
println(s"Number of edges: ${graph.numEdges}")
140
141
// Get degrees
142
val degrees = graph.degrees
143
val inDegrees = graph.inDegrees
144
val outDegrees = graph.outDegrees
145
146
// Transform vertices and edges
147
val newGraph = graph.mapVertices { case (id, (name, pos)) => (name, pos, id) }
148
val edgeGraph = graph.mapEdges(e => e.attr.toUpperCase)
149
150
// Subgraph operations
151
val professorsGraph = graph.subgraph(vpred = (id, attr) => attr._2 == "prof")
152
153
// Graph algorithms
154
val ranks = graph.pageRank(0.0001).vertices
155
val connectedComponents = graph.connectedComponents().vertices
156
```
157
158
## VertexRDD and EdgeRDD
159
160
### VertexRDD
161
162
```scala { .api }
163
abstract class VertexRDD[VD: ClassTag](
164
sc: SparkContext,
165
deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {
166
167
// Efficient join operations
168
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])
169
(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
170
def leftZipJoin[VD2: ClassTag, VD3: ClassTag](other: VertexRDD[VD2])
171
(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
172
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
173
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
174
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
175
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
176
177
// Aggregation operations
178
def aggregateUsingIndex[VD2: ClassTag](messages: RDD[(VertexId, VD2)],
179
reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
180
181
// Transformation operations
182
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
183
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
184
def diff(other: VertexRDD[VD]): VertexRDD[VD]
185
186
// Caching and persistence
187
def cache(): VertexRDD[VD]
188
def persist(newLevel: StorageLevel): VertexRDD[VD]
189
def unpersist(blocking: Boolean = false): VertexRDD[VD]
190
191
// Structural operations
192
def reindex(): VertexRDD[VD]
193
def withTargetStorageLevel(targetStorageLevel: StorageLevel): VertexRDD[VD]
194
}
195
196
object VertexRDD {
197
def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD]
198
def fromEdges[VD: ClassTag](edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD]
199
}
200
```
201
202
### EdgeRDD
203
204
```scala { .api }
205
abstract class EdgeRDD[ED: ClassTag](
206
sc: SparkContext,
207
deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
208
209
// Core operations
210
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
211
def reverse: EdgeRDD[ED]
212
def filter(epred: EdgeTriplet[_, ED] => Boolean, vpred: (VertexId, _) => Boolean): EdgeRDD[ED]
213
def innerJoin[ED2: ClassTag, ED3: ClassTag](other: RDD[(VertexId, ED2)])
214
(f: (VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
215
216
// Caching and persistence
217
def cache(): EdgeRDD[ED]
218
def persist(newLevel: StorageLevel): EdgeRDD[ED]
219
def unpersist(blocking: Boolean = false): EdgeRDD[ED]
220
221
// Structural operations
222
def count(): Long
223
def mapPartitionsWithIndex[ED2: ClassTag](f: (Int, Iterator[Edge[ED]]) => Iterator[ED2],
224
preservesPartitioning: Boolean = false): RDD[ED2]
225
def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED]
226
}
227
228
object EdgeRDD {
229
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED]
230
}
231
```
232
233
## Core Types
234
235
### Basic Types
236
237
```scala { .api }
238
// Vertex identifier type
239
type VertexId = Long
240
241
// Edge representation
242
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) extends Serializable {
243
def otherVertexId(vid: VertexId): VertexId = if (srcId == vid) dstId else srcId
244
def relativeDirection(vid: VertexId): EdgeDirection = {
245
if (vid == srcId) EdgeDirection.Out else EdgeDirection.In
246
}
247
}
248
249
// Edge triplet with vertex attributes
250
class EdgeTriplet[VD, ED] extends Edge[ED] {
251
var srcAttr: VD = _
252
var dstAttr: VD = _
253
254
def otherVertexAttr(vid: VertexId): VD = if (vid == srcId) dstAttr else srcAttr
255
def vertexAttr(vid: VertexId): VD = if (vid == srcId) srcAttr else dstAttr
256
257
def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
258
override def toString: String = s"(($srcId,$srcAttr),($dstId,$dstAttr),$attr)"
259
}
260
261
// Edge direction enumeration
262
object EdgeDirection extends Enumeration {
263
type EdgeDirection = Value
264
val In, Out, Either, Both = Value
265
}
266
267
// Edge context for message passing
268
abstract class EdgeContext[VD, ED, A] {
269
def srcId: VertexId
270
def dstId: VertexId
271
def srcAttr: VD
272
def dstAttr: VD
273
def attr: ED
274
def sendToSrc(msg: A): Unit
275
def sendToDst(msg: A): Unit
276
def toEdgeTriplet: EdgeTriplet[VD, ED]
277
}
278
279
// Triplet field specification for performance optimization
280
class TripletFields(val useSrc: Boolean, val useDst: Boolean, val useEdge: Boolean) extends Serializable
281
282
object TripletFields {
283
val None = new TripletFields(false, false, false)
284
val Src = new TripletFields(true, false, false)
285
val Dst = new TripletFields(false, true, false)
286
val Edge = new TripletFields(false, false, true)
287
val SrcOnly = new TripletFields(true, false, false)
288
val DstOnly = new TripletFields(false, true, false)
289
val EdgeOnly = new TripletFields(false, false, true)
290
val SrcDst = new TripletFields(true, true, false)
291
val SrcEdge = new TripletFields(true, false, true)
292
val DstEdge = new TripletFields(false, true, true)
293
val All = new TripletFields(true, true, true)
294
}
295
```
296
297
### Partitioning Strategies
298
299
```scala { .api }
300
abstract class PartitionStrategy extends Serializable {
301
def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID
302
}
303
304
object PartitionStrategy {
305
case object RandomVertexCut extends PartitionStrategy {
306
override def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID
307
}
308
309
case object EdgePartition1D extends PartitionStrategy {
310
override def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID
311
}
312
313
case object EdgePartition2D extends PartitionStrategy {
314
override def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID
315
}
316
317
case object CanonicalRandomVertexCut extends PartitionStrategy {
318
override def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID
319
}
320
321
def fromString(s: String): PartitionStrategy
322
}
323
```
324
325
### Usage Examples
326
327
```scala
328
import org.apache.spark.graphx._
329
330
// Working with VertexRDD
331
val newUsers: VertexRDD[String] = users.mapValues { case (name, pos) => name }
332
val joinedVertices = users.leftJoin(degrees) {
333
case (id, (name, pos), Some(deg)) => (name, pos, deg)
334
case (id, (name, pos), None) => (name, pos, 0)
335
}
336
337
// Working with EdgeRDD
338
val upperCaseEdges = relationships.mapValues(_.toUpperCase)
339
val reversedEdges = relationships.reverse
340
341
// Custom partitioning
342
val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)
343
344
// Different triplet fields for optimization
345
val messages = graph.aggregateMessages[Int](
346
triplet => {
347
triplet.sendToSrc(1)
348
triplet.sendToDst(1)
349
},
350
(a: Int, b: Int) => a + b,
351
TripletFields.None // Only need edge structure, not attributes
352
)
353
```
354
355
## Graph Algorithms
356
357
### PageRank
358
359
```scala { .api }
360
object PageRank {
361
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
362
resetProb: Double = 0.15): Graph[Double, Double]
363
364
def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double,
365
resetProb: Double = 0.15): Graph[Double, Double]
366
367
def runWithOptions[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
368
resetProb: Double = 0.15,
369
srcId: Option[VertexId] = None): Graph[Double, Double]
370
371
def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double,
372
resetProb: Double = 0.15,
373
srcId: Option[VertexId] = None): Graph[Double, Double]
374
}
375
```
376
377
### Connected Components
378
379
```scala { .api }
380
object ConnectedComponents {
381
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
382
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIterations: Int): Graph[VertexId, ED]
383
}
384
```
385
386
### Strongly Connected Components
387
388
```scala { .api }
389
object StronglyConnectedComponents {
390
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]
391
}
392
```
393
394
### Triangle Count
395
396
```scala { .api }
397
object TriangleCount {
398
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
399
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
400
}
401
```
402
403
### Usage Examples
404
405
```scala
406
import org.apache.spark.graphx.lib._
407
408
// PageRank
409
val ranks = PageRank.run(graph, numIter = 10)
410
val convergedRanks = PageRank.runUntilConvergence(graph, tol = 0.0001)
411
412
// Connected Components
413
val cc = ConnectedComponents.run(graph)
414
val ccLimited = ConnectedComponents.run(graph, maxIterations = 10)
415
416
// Triangle Count
417
val triCounts = TriangleCount.run(graph)
418
419
// Join results with original vertex attributes
420
val ranksByUsername = users.join(ranks.vertices).map {
421
case (id, ((username, title), rank)) => (username, rank)
422
}
423
```
424
425
## GraphLoader
426
427
Utility for loading graphs from files.
428
429
```scala { .api }
430
object GraphLoader {
431
def edgeListFile(sc: SparkContext, path: String,
432
canonicalOrientation: Boolean = false,
433
numEdgePartitions: Int = -1,
434
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
435
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[Int, Int]
436
}
437
```
438
439
### Usage Examples
440
441
```scala
442
import org.apache.spark.graphx.GraphLoader
443
444
// Load graph from edge list file
445
// File format: srcId dstId
446
// Example: 1 2
447
// 2 3
448
// 3 1
449
val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")
450
451
// With custom storage levels
452
val graphWithCustomStorage = GraphLoader.edgeListFile(
453
sc,
454
"path/to/edges.txt",
455
canonicalOrientation = true,
456
numEdgePartitions = 4,
457
edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
458
vertexStorageLevel = StorageLevel.MEMORY_ONLY
459
)
460
```
461
462
## Pregel API
463
464
The Pregel API provides a vertex-centric approach to graph computation.
465
466
### Usage Examples
467
468
```scala
469
// Example: Single Source Shortest Path using Pregel
470
import scala.math.min
471
472
val sourceId: VertexId = 42L
473
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
474
475
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
476
// Vertex program: receives the current vertex value and incoming message
477
(id, dist, newDist) => min(dist, newDist),
478
479
// Send message: send distance + edge weight to destination vertex
480
triplet => {
481
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
482
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
483
} else {
484
Iterator.empty
485
}
486
},
487
488
// Merge messages: take minimum distance
489
(a, b) => min(a, b)
490
)
491
492
// Custom Pregel example: compute vertex degrees
493
val degreeGraph = graph.pregel(0)(
494
// Vertex program: sum incoming messages
495
(id, oldSum, msgSum) => oldSum + msgSum,
496
497
// Send message: send 1 to each neighbor
498
triplet => {
499
Iterator((triplet.srcId, 1), (triplet.dstId, 1))
500
},
501
502
// Merge messages: sum them
503
(a, b) => a + b
504
)
505
```