0
# Apache Spark GraphX
1
2
Apache Spark GraphX is a distributed graph processing library built on top of Apache Spark. It provides a comprehensive set of graph algorithms and operations for large-scale graph analytics with vertices, edges, and property graphs, along with core operations optimized for distributed computing environments.
3
4
## Package Information
5
6
- **Package Name**: spark-graphx_2.12
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: Add to `pom.xml` or include in Spark application classpath
10
11
**Maven Dependency:**
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-graphx_2.12</artifactId>
16
<version>2.4.8</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```scala
23
import org.apache.spark.graphx._
24
import org.apache.spark.graphx.lib._
25
import org.apache.spark.rdd.RDD
26
```
27
28
For specific components:
29
```scala
30
import org.apache.spark.graphx.{Graph, VertexId, Edge, EdgeDirection}
31
import org.apache.spark.graphx.lib.{PageRank, ConnectedComponents, TriangleCount}
32
```
33
34
## Basic Usage
35
36
```scala
37
import org.apache.spark.graphx._
38
import org.apache.spark.rdd.RDD
39
40
// Create vertices RDD: (VertexId, VertexAttribute)
41
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
42
(1L, "Alice"), (2L, "Bob"), (3L, "Charlie")
43
))
44
45
// Create edges RDD
46
val edges: RDD[Edge[String]] = sc.parallelize(Array(
47
Edge(1L, 2L, "friend"),
48
Edge(2L, 3L, "colleague"),
49
Edge(3L, 1L, "neighbor")
50
))
51
52
// Create graph
53
val graph = Graph(vertices, edges)
54
55
// Basic graph operations
56
println(s"Graph has ${graph.numVertices} vertices and ${graph.numEdges} edges")
57
58
// Run PageRank algorithm
59
val pageRankGraph = graph.pageRank(0.0001)
60
61
// Get vertex degrees
62
val inDegrees = graph.inDegrees
63
val outDegrees = graph.outDegrees
64
```
65
66
## Architecture
67
68
GraphX is built around several key components:
69
70
- **Graph Abstraction**: Core `Graph[VD, ED]` trait representing immutable property graphs with vertex data type `VD` and edge data type `ED`
71
- **Specialized RDDs**: `VertexRDD` and `EdgeRDD` with optimized storage and fast join operations
72
- **Message Passing**: Bulk synchronous message passing via `aggregateMessages` and Pregel API
73
- **Algorithm Library**: Pre-implemented graph algorithms (PageRank, Connected Components, etc.)
74
- **Partitioning Strategies**: Multiple edge partitioning strategies for different workloads
75
- **Functional Design**: Immutable data structures where operations return new graphs
76
77
## Capabilities
78
79
### Core Graph Operations
80
81
Fundamental graph abstractions and operations for creating, transforming, and querying property graphs with type-safe vertex and edge attributes.
82
83
```scala { .api }
84
abstract class Graph[VD: ClassTag, ED: ClassTag] {
85
val vertices: VertexRDD[VD]
86
val edges: EdgeRDD[ED]
87
val triplets: RDD[EdgeTriplet[VD, ED]]
88
89
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
90
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
91
def subgraph(epred: EdgeTriplet[VD, ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
92
def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A): VertexRDD[A]
93
}
94
95
object Graph {
96
def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]]): Graph[VD, ED]
97
def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD): Graph[VD, ED]
98
def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD): Graph[VD, Int]
99
}
100
```
101
102
[Core Graph Operations](./core-graph-operations.md)
103
104
### Graph Analytics and Metrics
105
106
Graph analytics operations for computing structural properties, degrees, neighborhoods, and graph metrics.
107
108
```scala { .api }
109
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
110
lazy val numEdges: Long
111
lazy val numVertices: Long
112
lazy val inDegrees: VertexRDD[Int]
113
lazy val outDegrees: VertexRDD[Int]
114
lazy val degrees: VertexRDD[Int]
115
116
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
117
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
118
}
119
```
120
121
[Graph Analytics and Metrics](./graph-analytics.md)
122
123
### RDD Abstractions
124
125
Specialized RDD implementations optimized for graph operations with fast joins and efficient storage.
126
127
```scala { .api }
128
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
129
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
130
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
131
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
132
}
133
134
abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
135
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
136
def reverse: EdgeRDD[ED]
137
}
138
```
139
140
[RDD Abstractions](./rdd-abstractions.md)
141
142
### Graph Algorithms Library
143
144
Pre-implemented graph algorithms for common analytics tasks including centrality, community detection, and path finding.
145
146
```scala { .api }
147
// PageRank Algorithm
148
object PageRank {
149
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
150
def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
151
}
152
153
// Connected Components
154
object ConnectedComponents {
155
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
156
}
157
158
// Triangle Counting
159
object TriangleCount {
160
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
161
}
162
```
163
164
[Graph Algorithms Library](./graph-algorithms.md)
165
166
### Message Passing and Pregel
167
168
Bulk synchronous message passing framework for implementing custom graph algorithms using vertex-centric programming model.
169
170
```scala { .api }
171
// Core message passing
172
def aggregateMessages[A: ClassTag](
173
sendMsg: EdgeContext[VD, ED, A] => Unit,
174
mergeMsg: (A, A) => A
175
): VertexRDD[A]
176
177
// Pregel API
178
object Pregel {
179
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](
180
graph: Graph[VD, ED],
181
initialMsg: A,
182
maxIterations: Int = Int.MaxValue,
183
activeDirection: EdgeDirection = EdgeDirection.Either
184
)(
185
vprog: (VertexId, VD, A) => VD,
186
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
187
mergeMsg: (A, A) => A
188
): Graph[VD, ED]
189
}
190
```
191
192
[Message Passing and Pregel](./message-passing.md)
193
194
### Graph Loading and Utilities
195
196
Utilities for loading graphs from files, partitioning strategies, and configuration options.
197
198
```scala { .api }
199
object GraphLoader {
200
def edgeListFile(
201
sc: SparkContext,
202
path: String,
203
canonicalOrientation: Boolean = false,
204
numEdgePartitions: Int = -1,
205
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
206
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
207
): Graph[Int, Int]
208
}
209
210
trait PartitionStrategy {
211
def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
212
}
213
```
214
215
[Graph Loading and Utilities](./loading-utilities.md)
216
217
## Types
218
219
Core type definitions used throughout GraphX:
220
221
```scala { .api }
222
// Package-level type aliases
223
type VertexId = Long
224
type PartitionID = Int
225
226
// Edge types
227
case class Edge[ED](var srcId: VertexId, var dstId: VertexId, var attr: ED) {
228
def otherVertexId(vid: VertexId): VertexId
229
def relativeDirection(vid: VertexId): EdgeDirection
230
}
231
232
class EdgeTriplet[VD, ED] extends Edge[ED] {
233
var srcAttr: VD
234
var dstAttr: VD
235
def otherVertexAttr(vid: VertexId): VD
236
}
237
238
// Edge direction enumeration
239
class EdgeDirection private (private val name: String) extends Serializable {
240
def reverse: EdgeDirection
241
override def toString: String
242
override def equals(o: Any): Boolean
243
override def hashCode: Int
244
}
245
246
object EdgeDirection {
247
final val In: EdgeDirection
248
final val Out: EdgeDirection
249
final val Either: EdgeDirection
250
final val Both: EdgeDirection
251
}
252
```