0
# Graph Processing (GraphX)
1
2
GraphX is Apache Spark's API for graph-parallel computation, combining the benefits of both data-parallel and graph-parallel systems. It provides a unified framework for ETL, exploratory analysis, and iterative graph computation within a single system.
3
4
```scala { .api }
5
import org.apache.spark.graphx._
6
import org.apache.spark.rdd.RDD
7
8
// Core graph types
9
type VertexId = Long
10
type EdgeTriplet[VD, ED] = org.apache.spark.graphx.EdgeTriplet[VD, ED]
11
12
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED)
13
14
class Graph[VD: ClassTag, ED: ClassTag] extends Serializable {
15
// Graph properties
16
def vertices: VertexRDD[VD]
17
def edges: EdgeRDD[ED]
18
def triplets: RDD[EdgeTriplet[VD, ED]]
19
def numVertices: Long
20
def numEdges: Long
21
22
// Graph transformations
23
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
24
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
25
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
26
def reverse: Graph[VD, ED]
27
def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = _ => true, vpred: (VertexId, VD) => Boolean = (_, _) => true): Graph[VD, ED]
28
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
29
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
30
31
// Structural operations
32
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
33
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
34
def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
35
36
// Caching and persistence
37
def cache(): Graph[VD, ED]
38
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
39
def unpersist(blocking: Boolean = false): Graph[VD, ED]
40
def checkpoint(): Unit
41
def isCheckpointed: Boolean
42
43
// Graph algorithms
44
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
45
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
46
def connectedComponents(): Graph[VertexId, ED]
47
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
48
def triangleCount(): Graph[Int, ED]
49
}
50
51
// Graph construction
52
object Graph {
53
def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null.asInstanceOf[VD], edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
54
55
def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
56
57
def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]
58
}
59
60
// Built-in graph algorithms
61
object GraphXUtils {
62
def connectedComponents[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
63
def stronglyConnectedComponents[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]
64
def triangleCount[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
65
def pageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
66
def staticPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
67
}
68
```
69
70
**Usage Examples:**
71
72
```scala
73
import org.apache.spark.graphx._
74
import org.apache.spark.rdd.RDD
75
76
val sc = spark.sparkContext
77
78
// Create vertices RDD
79
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
80
(1L, "Alice"), (2L, "Bob"), (3L, "Charlie"), (4L, "David"), (5L, "Ed")
81
))
82
83
// Create edges RDD
84
val edges: RDD[Edge[String]] = sc.parallelize(Array(
85
Edge(1L, 2L, "friend"), Edge(2L, 3L, "follow"), Edge(3L, 4L, "friend"),
86
Edge(4L, 5L, "follow"), Edge(1L, 3L, "follow"), Edge(2L, 5L, "friend")
87
))
88
89
// Create graph
90
val graph = Graph(vertices, edges)
91
92
// Basic graph statistics
93
println(s"Number of vertices: ${graph.numVertices}")
94
println(s"Number of edges: ${graph.numEdges}")
95
96
// Graph algorithms
97
val pageRankGraph = graph.pageRank(0.0001).cache()
98
val connectedComponentsGraph = graph.connectedComponents().cache()
99
val triangleCountGraph = graph.triangleCount().cache()
100
101
// Analyze results
102
pageRankGraph.vertices.collect().foreach {
103
case (id, rank) => println(s"Vertex $id has PageRank $rank")
104
}
105
106
// Complex graph analysis
107
val inDegrees = graph.inDegrees
108
val outDegrees = graph.outDegrees
109
val degrees = graph.degrees
110
111
// Join with vertex data
112
val vertexInfo = vertices.join(inDegrees).join(outDegrees).map {
113
case (id, ((name, inDeg), outDeg)) => (id, (name, inDeg, outDeg))
114
}
115
116
vertexInfo.collect().foreach {
117
case (id, (name, inDeg, outDeg)) =>
118
println(s"$name (id: $id) has $inDeg in-degree and $outDeg out-degree")
119
}
120
```
121
122
## Capabilities
123
124
### Graph Construction and Basic Operations
125
126
Create graphs from vertices and edges, perform basic transformations:
127
128
```scala
129
// From edge list
130
val socialGraph = Graph.fromEdgeTuples(
131
sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L))),
132
defaultValue = "user"
133
)
134
135
// Transform vertices and edges
136
val transformedGraph = graph
137
.mapVertices((id, attr) => attr.toUpperCase)
138
.mapEdges(edge => edge.attr + "_relationship")
139
140
// Subgraph operations
141
val friendsOnly = graph.subgraph(
142
epred = triplet => triplet.attr == "friend",
143
vpred = (id, attr) => attr != "Ed"
144
)
145
```
146
147
### Graph Algorithms
148
149
Built-in implementations of common graph algorithms:
150
151
```scala
152
// PageRank algorithm
153
val pageRanks = graph.pageRank(0.0001)
154
val topUsers = pageRanks.vertices.top(5)(Ordering.by(_._2))
155
156
// Connected components
157
val cc = graph.connectedComponents()
158
val sameComponent = cc.vertices.filter { case (id, component) =>
159
component == 1L
160
}.collect()
161
162
// Triangle counting
163
val triangles = graph.triangleCount()
164
val vertexTriangles = triangles.vertices.collect()
165
166
// Shortest paths
167
val sourceId: VertexId = 1L
168
val shortestPaths = graph.shortestPaths(Seq(sourceId))
169
```
170
171
### Message Passing and Aggregation
172
173
The Pregel-like aggregateMessages API for custom graph computations:
174
175
```scala
176
// Count neighbors
177
val neighborCounts = graph.aggregateMessages[Int](
178
triplet => {
179
triplet.sendToSrc(1)
180
triplet.sendToDst(1)
181
},
182
(a, b) => a + b
183
)
184
185
// Collect neighbor information
186
val neighborAttrs = graph.aggregateMessages[Array[String]](
187
triplet => {
188
triplet.sendToSrc(Array(triplet.dstAttr))
189
triplet.sendToDst(Array(triplet.srcAttr))
190
},
191
(a, b) => a ++ b
192
)
193
194
// Custom algorithm: find vertices with high-degree neighbors
195
val highDegreeNeighbors = graph.aggregateMessages[Int](
196
triplet => {
197
if (triplet.srcAttr.contains("important")) {
198
triplet.sendToDst(1)
199
}
200
if (triplet.dstAttr.contains("important")) {
201
triplet.sendToSrc(1)
202
}
203
},
204
(a, b) => a + b
205
)
206
```
207
208
GraphX provides efficient graph processing capabilities that integrate seamlessly with Spark's data processing pipeline, enabling complex graph analytics at scale.