0
# Graph Processing
1
2
GraphX is Apache Spark's API for graphs and graph-parallel computation. It extends the Spark RDD abstraction with a resilient distributed property graph where vertices and edges have properties.
3
4
## Package Information
5
6
Graph processing functionality is available through:
7
8
```scala
9
import org.apache.spark.graphx._
10
import org.apache.spark.graphx.lib._
11
```
12
13
## Basic Usage
14
15
```scala
16
import org.apache.spark.graphx._
17
import org.apache.spark.rdd.RDD
18
19
// Create vertices RDD
20
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
21
(1L, "Alice"),
22
(2L, "Bob"),
23
(3L, "Charlie"),
24
(4L, "David")
25
))
26
27
// Create edges RDD
28
val edges: RDD[Edge[String]] = sc.parallelize(Array(
29
Edge(1L, 2L, "friend"),
30
Edge(2L, 3L, "friend"),
31
Edge(3L, 4L, "colleague"),
32
Edge(4L, 1L, "friend")
33
))
34
35
// Create graph
36
val graph = Graph(vertices, edges)
37
38
// Basic graph operations
39
println(s"Number of vertices: ${graph.vertices.count()}")
40
println(s"Number of edges: ${graph.edges.count()}")
41
42
// Graph algorithms
43
val ranks = graph.pageRank(0.0001).vertices
44
val connectedComponents = graph.connectedComponents().vertices
45
46
// Join with original vertex names
47
val ranksByUsername = vertices.join(ranks).map {
48
case (id, (username, rank)) => (username, rank)
49
}
50
51
ranksByUsername.collect().foreach(println)
52
```
53
54
## Capabilities
55
56
### Core Graph Types
57
58
#### Graph
59
60
The fundamental graph abstraction representing a property graph with typed vertex and edge properties.
61
62
```scala { .api }
63
abstract class Graph[VD: ClassTag, ED: ClassTag] extends Serializable {
64
// Graph structure
65
def vertices: VertexRDD[VD]
66
def edges: EdgeRDD[ED]
67
def triplets: RDD[EdgeTriplet[VD, ED]]
68
69
// Basic operations
70
def numEdges: Long
71
def numVertices: Long
72
def inDegrees: VertexRDD[Int]
73
def outDegrees: VertexRDD[Int]
74
def degrees: VertexRDD[Int]
75
76
// Transformations
77
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
78
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
79
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
80
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2, tripletFields: TripletFields): Graph[VD, ED2]
81
82
// Structural operations
83
def reverse: Graph[VD, ED]
84
def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
85
vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]
86
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
87
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
88
89
// Join operations
90
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
91
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
92
(updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
93
94
// Graph algorithms
95
def connectedComponents(): Graph[VertexId, ED]
96
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
97
def triangleCount(): Graph[Int, ED]
98
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
99
def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
100
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
101
102
// Aggregation
103
def aggregateMessages[A: ClassTag](
104
sendMsg: EdgeContext[VD, ED, A] => Unit,
105
mergeMsg: (A, A) => A,
106
tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
107
108
// Pregel API
109
def pregel[A: ClassTag](initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)
110
(vprog: (VertexId, VD, A) => VD,
111
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
112
mergeMsg: (A, A) => A): Graph[VD, ED]
113
114
// Persistence
115
def cache(): Graph[VD, ED]
116
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
117
def unpersist(blocking: Boolean = true): Graph[VD, ED]
118
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
119
120
// Checkpointing
121
def checkpoint(): Unit
122
def isCheckpointed: Boolean
123
def getCheckpointFiles: Seq[String]
124
}
125
126
object Graph {
127
def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]],
128
defaultVertexAttr: VD = null.asInstanceOf[VD],
129
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
130
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
131
132
def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD,
133
uniqueEdges: Option[PartitionStrategy] = None,
134
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
135
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
136
137
def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD,
138
uniqueEdges: Option[PartitionStrategy] = None,
139
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
140
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]
141
}
142
```
143
144
#### VertexRDD
145
146
A specialized RDD for representing vertices in a graph.
147
148
```scala { .api }
149
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
150
// RDD operations optimized for vertices
151
def mapVertexPartitions[VD2: ClassTag](f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]): VertexRDD[VD2]
152
153
// Join operations
154
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])
155
(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
156
def leftZipJoin[VD2: ClassTag, VD3: ClassTag](other: VertexRDD[VD2])
157
(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
158
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
159
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
160
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
161
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
162
163
// Aggregation
164
def aggregateUsingIndex[VD2: ClassTag](messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
165
166
// Set operations
167
def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
168
def minus(other: VertexRDD[VD]): VertexRDD[VD]
169
170
// Conversion
171
def toRDD: RDD[(VertexId, VD)]
172
}
173
```
174
175
#### EdgeRDD
176
177
A specialized RDD for representing edges in a graph.
178
179
```scala { .api }
180
abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
181
// RDD operations optimized for edges
182
def mapEdgePartitions[ED2: ClassTag, VD: ClassTag](f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD]): EdgeRDD[ED2]
183
184
// Join operations
185
def innerJoin[ED2: ClassTag, ED3: ClassTag](other: RDD[Edge[ED2]])
186
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
187
188
// Conversion
189
def toRDD: RDD[Edge[ED]]
190
}
191
```
192
193
#### Edge and EdgeTriplet
194
195
Basic edge data structures.
196
197
```scala { .api }
198
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) extends Serializable
199
200
class EdgeTriplet[VD, ED] extends Edge[ED] {
201
def srcAttr: VD
202
def dstAttr: VD
203
def otherVertexAttr(vid: VertexId): VD
204
def otherVertexId(vid: VertexId): VertexId
205
def relativizeDirection(vid: VertexId): EdgeDirection
206
def toTuple: ((VertexId, VD), (VertexId, VD), ED)
207
}
208
209
type VertexId = Long
210
211
object EdgeDirection extends Enumeration {
212
type EdgeDirection = Value
213
val In, Out, Either, Both = Value
214
}
215
```
216
217
### Graph Algorithms
218
219
GraphX provides implementations of common graph algorithms.
220
221
#### PageRank
222
223
```scala { .api }
224
object PageRank {
225
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
226
227
def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
228
229
def runWithOptions[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
230
srcId: Option[VertexId] = None): Graph[Double, Double]
231
}
232
```
233
234
Usage example:
235
236
```scala
237
val graph: Graph[String, String] = // ... create graph
238
val ranks = graph.pageRank(0.0001).vertices
239
val topRanks = ranks.top(10)(Ordering.by(_._2))
240
```
241
242
#### Connected Components
243
244
```scala { .api }
245
object ConnectedComponents {
246
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
247
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIterations: Int): Graph[VertexId, ED]
248
}
249
```
250
251
#### Strongly Connected Components
252
253
```scala { .api }
254
object StronglyConnectedComponents {
255
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]
256
}
257
```
258
259
#### Triangle Count
260
261
```scala { .api }
262
object TriangleCount {
263
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
264
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
265
}
266
```
267
268
#### Label Propagation
269
270
```scala { .api }
271
object LabelPropagation {
272
def run[ED: ClassTag](graph: Graph[Int, ED], maxSteps: Int): Graph[VertexId, ED]
273
}
274
```
275
276
#### Shortest Paths
277
278
```scala { .api }
279
object ShortestPaths {
280
def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED]
281
282
type SPMap = Map[VertexId, Int]
283
}
284
```
285
286
### Graph Construction and Loading
287
288
#### GraphLoader
289
290
Utilities for loading graphs from various formats.
291
292
```scala { .api }
293
object GraphLoader {
294
def edgeListFile(sc: SparkContext, path: String, canonicalOrientation: Boolean = false,
295
numEdgePartitions: Int = -1, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
296
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[Int, Int]
297
}
298
```
299
300
Usage example:
301
302
```scala
303
// Load graph from edge list file
304
val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")
305
306
// Edge list file format: srcId dstId (one edge per line)
307
// Example content:
308
// 1 2
309
// 2 3
310
// 3 1
311
```
312
313
### Graph Partitioning
314
315
Control how graph data is distributed across the cluster.
316
317
```scala { .api }
318
abstract class PartitionStrategy extends Serializable {
319
def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
320
}
321
322
object PartitionStrategy {
323
case object RandomVertexCut extends PartitionStrategy
324
case object EdgePartition1D extends PartitionStrategy
325
case object EdgePartition2D extends PartitionStrategy
326
case object CanonicalRandomVertexCut extends PartitionStrategy
327
328
def fromString(s: String): PartitionStrategy
329
}
330
```
331
332
### Message Passing
333
334
GraphX provides the aggregateMessages API for efficient message passing.
335
336
```scala { .api }
337
abstract class EdgeContext[VD, ED, A] {
338
def srcId: VertexId
339
def dstId: VertexId
340
def srcAttr: VD
341
def dstAttr: VD
342
def attr: ED
343
def sendToSrc(msg: A): Unit
344
def sendToDst(msg: A): Unit
345
def toEdgeTriplet: EdgeTriplet[VD, ED]
346
}
347
348
case class TripletFields(useSrc: Boolean = true, useDst: Boolean = true, useEdge: Boolean = true)
349
350
object TripletFields {
351
val None = TripletFields(false, false, false)
352
val EdgeOnly = TripletFields(false, false, true)
353
val Src = TripletFields(true, false, false)
354
val Dst = TripletFields(false, true, false)
355
val All = TripletFields(true, true, true)
356
}
357
```
358
359
Usage example:
360
361
```scala
362
val graph: Graph[Double, Double] = // ... create graph
363
364
// Compute sum of neighbor values
365
val neighborSum = graph.aggregateMessages[Double](
366
triplet => {
367
// Send source attribute to destination
368
triplet.sendToDst(triplet.srcAttr)
369
// Send destination attribute to source
370
triplet.sendToSrc(triplet.dstAttr)
371
},
372
// Merge function
373
(a, b) => a + b
374
)
375
376
// Update vertex attributes with neighbor sums
377
val newGraph = graph.joinVertices(neighborSum) { (vid, oldAttr, msgSum) =>
378
msgSum.getOrElse(0.0)
379
}
380
```
381
382
### Pregel API
383
384
The Pregel API is a vertex-centric approach to graph computation.
385
386
```scala { .api }
387
object Pregel {
388
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED], initialMsg: A,
389
maxIterations: Int = Int.MaxValue,
390
activeDirection: EdgeDirection = EdgeDirection.Either)
391
(vprog: (VertexId, VD, A) => VD,
392
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
393
mergeMsg: (A, A) => A): Graph[VD, ED]
394
}
395
```
396
397
Usage example:
398
399
```scala
400
// Single-source shortest path using Pregel
401
def shortestPaths[ED: ClassTag](graph: Graph[Double, ED], sourceId: VertexId): Graph[Double, ED] = {
402
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
403
404
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
405
// Vertex program: update vertex value with minimum distance
406
(id, dist, newDist) => math.min(dist, newDist),
407
408
// Send message: send distance + edge weight to neighbors
409
triplet => {
410
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
411
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
412
} else {
413
Iterator.empty
414
}
415
},
416
417
// Merge messages: take minimum distance
418
(a, b) => math.min(a, b)
419
)
420
421
sssp
422
}
423
```
424
425
### GraphX Utilities
426
427
Additional utilities for graph processing.
428
429
```scala { .api }
430
object GraphGenerators {
431
def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0, sigma: Double = 1.3,
432
seed: Long = -1): Graph[Long, Int]
433
434
def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int, a: Double = 0.45, b: Double = 0.15,
435
c: Double = 0.15, d: Double = 0.25, seed: Long = -1, numEParts: Int = 0): Graph[Int, Int]
436
437
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int]
438
439
def gridGraph(sc: SparkContext, height: Int, width: Int): Graph[(Int, Int), Double]
440
}
441
```
442
443
Usage example:
444
445
```scala
446
// Generate a synthetic graph
447
val syntheticGraph = GraphGenerators.logNormalGraph(sc, numVertices = 1000)
448
449
// Run PageRank on synthetic graph
450
val ranks = syntheticGraph.pageRank(0.001).vertices
451
val topVertices = ranks.top(10)(Ordering.by(_._2))
452
```