0
# Core Graph Operations
1
2
Fundamental graph abstractions and operations for creating, transforming, and querying property graphs with type-safe vertex and edge attributes.
3
4
## Capabilities
5
6
### Graph Trait
7
8
Main graph abstraction representing an immutable property graph with arbitrary objects associated with vertices and edges.
9
10
```scala { .api }
11
/**
12
* Property graph with vertex data type VD and edge data type ED
13
* Immutable - operations return new graphs rather than modifying existing ones
14
*/
15
abstract class Graph[VD: ClassTag, ED: ClassTag] {
16
/** RDD containing vertices and their attributes */
17
val vertices: VertexRDD[VD]
18
19
/** RDD containing edges and their attributes */
20
val edges: EdgeRDD[ED]
21
22
/** RDD of edge triplets with adjacent vertex data */
23
val triplets: RDD[EdgeTriplet[VD, ED]]
24
25
/** Cache/persist the graph at specified storage level */
26
def persist(newLevel: StorageLevel): Graph[VD, ED]
27
def cache(): Graph[VD, ED]
28
def unpersist(blocking: Boolean): Graph[VD, ED]
29
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
30
31
/** Checkpoint support for fault tolerance */
32
def checkpoint(): Unit
33
def isCheckpointed: Boolean
34
def getCheckpointFiles: Seq[String]
35
36
/** Repartition edges according to partition strategy */
37
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
38
def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
39
}
40
```
41
42
**Usage Examples:**
43
44
```scala
45
import org.apache.spark.graphx._
46
import org.apache.spark.rdd.RDD
47
48
// Create vertices and edges
49
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
50
(1L, "Alice"), (2L, "Bob"), (3L, "Charlie"), (4L, "David")
51
))
52
53
val edges: RDD[Edge[String]] = sc.parallelize(Array(
54
Edge(1L, 2L, "follows"),
55
Edge(2L, 3L, "friend"),
56
Edge(3L, 4L, "colleague")
57
))
58
59
// Create graph
60
val graph = Graph(vertices, edges)
61
62
// Cache for repeated operations
63
val cachedGraph = graph.cache()
64
65
// Access graph components
66
println(s"Vertices count: ${graph.vertices.count()}")
67
println(s"Edges count: ${graph.edges.count()}")
68
println(s"Triplets count: ${graph.triplets.count()}")
69
```
70
71
### Graph Factory Methods
72
73
Static methods for creating graphs from various data sources.
74
75
```scala { .api }
76
object Graph {
77
/**
78
* Create graph from vertex and edge RDDs
79
*/
80
def apply[VD: ClassTag, ED: ClassTag](
81
vertices: RDD[(VertexId, VD)],
82
edges: RDD[Edge[ED]]
83
): Graph[VD, ED]
84
85
/**
86
* Create graph from edges RDD with default vertex value
87
*/
88
def fromEdges[VD: ClassTag, ED: ClassTag](
89
edges: RDD[Edge[ED]],
90
defaultValue: VD
91
): Graph[VD, ED]
92
93
/**
94
* Create graph from simple edge tuples with default vertex and edge values
95
*/
96
def fromEdgeTuples[VD: ClassTag](
97
rawEdges: RDD[(VertexId, VertexId)],
98
defaultValue: VD
99
): Graph[VD, Int]
100
}
101
```
102
103
**Usage Examples:**
104
105
```scala
106
// Create from vertices and edges
107
val graph1 = Graph(vertices, edges)
108
109
// Create from edges only (vertices created automatically)
110
val edgeOnlyGraph = Graph.fromEdges(edges, "Unknown")
111
112
// Create from simple tuples
113
val tuples: RDD[(VertexId, VertexId)] = sc.parallelize(Array(
114
(1L, 2L), (2L, 3L), (3L, 1L)
115
))
116
val simpleGraph = Graph.fromEdgeTuples(tuples, "Person")
117
```
118
119
### Graph Transformation Operations
120
121
Operations that transform graphs by modifying vertex or edge attributes.
122
123
```scala { .api }
124
/**
125
* Transform vertex attributes using a mapping function
126
*/
127
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
128
129
/**
130
* Transform edge attributes using a mapping function
131
*/
132
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
133
134
/**
135
* Transform edge attributes using triplet information
136
*/
137
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
138
139
/**
140
* Reverse direction of all edges
141
*/
142
def reverse: Graph[VD, ED]
143
```
144
145
**Usage Examples:**
146
147
```scala
148
// Transform vertex attributes - add prefix
149
val prefixedGraph = graph.mapVertices((id, name) => s"User_$name")
150
151
// Transform edge attributes - convert to weights
152
val weightedGraph = graph.mapEdges(edge => edge.attr.length)
153
154
// Transform edges using triplet data
155
val annotatedGraph = graph.mapTriplets(triplet =>
156
s"${triplet.srcAttr} -> ${triplet.dstAttr}: ${triplet.attr}"
157
)
158
159
// Reverse all edges
160
val reversedGraph = graph.reverse
161
```
162
163
### Graph Filtering and Subgraph Operations
164
165
Operations that create subgraphs by filtering vertices and edges.
166
167
```scala { .api }
168
/**
169
* Create subgraph by filtering edges and vertices
170
* @param epred Edge predicate function - edges to keep
171
* @param vpred Vertex predicate function - vertices to keep
172
*/
173
def subgraph(
174
epred: EdgeTriplet[VD, ED] => Boolean = _ => true,
175
vpred: (VertexId, VD) => Boolean = (_, _) => true
176
): Graph[VD, ED]
177
178
/**
179
* Group edges with same source/destination and merge attributes
180
*/
181
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
182
183
/**
184
* Restrict graph to vertices and edges that exist in another graph
185
* Keeps vertex and edge attributes from this graph
186
*/
187
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
188
```
189
190
**Usage Examples:**
191
192
```scala
193
// Filter to only "friend" relationships
194
val friendsGraph = graph.subgraph(triplet => triplet.attr == "friend")
195
196
// Filter to specific users and their connections
197
val aliceFriendsGraph = graph.subgraph(
198
epred = triplet => triplet.srcAttr == "Alice" || triplet.dstAttr == "Alice",
199
vpred = (id, name) => name == "Alice" || name == "Bob"
200
)
201
202
// Group parallel edges by combining attributes
203
val groupedGraph = graph.groupEdges((a, b) => s"$a,$b")
204
205
// Create masked graph using structure from another graph
206
val template: Graph[String, String] = // some other graph
207
val maskedGraph = graph.mask(template) // keeps original attributes
208
```
209
210
### Vertex Join Operations
211
212
Operations that join vertex data with external RDDs.
213
214
```scala { .api }
215
/**
216
* Join vertices with another RDD, keeping all original vertices
217
* Missing join keys get None values
218
*/
219
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
220
mapFunc: (VertexId, VD, Option[U]) => VD2
221
): Graph[VD2, ED]
222
```
223
224
**Usage Examples:**
225
226
```scala
227
// Add age information to vertices
228
val ages: RDD[(VertexId, Int)] = sc.parallelize(Array(
229
(1L, 25), (2L, 30), (3L, 28)
230
))
231
232
val enrichedGraph = graph.outerJoinVertices(ages) { (id, name, ageOpt) =>
233
ageOpt match {
234
case Some(age) => s"$name ($age)"
235
case None => s"$name (age unknown)"
236
}
237
}
238
```
239
240
### Message Aggregation
241
242
Core message passing operation for implementing graph algorithms.
243
244
```scala { .api }
245
/**
246
* Aggregate messages sent along edges to compute new vertex values
247
* @param sendMsg Function to send messages along edges
248
* @param mergeMsg Function to merge multiple messages at same vertex
249
*/
250
def aggregateMessages[A: ClassTag](
251
sendMsg: EdgeContext[VD, ED, A] => Unit,
252
mergeMsg: (A, A) => A,
253
tripletFields: TripletFields = TripletFields.All
254
): VertexRDD[A]
255
```
256
257
**Usage Examples:**
258
259
```scala
260
// Count neighbors for each vertex
261
val neighborCounts = graph.aggregateMessages[Int](
262
triplet => {
263
triplet.sendToSrc(1)
264
triplet.sendToDst(1)
265
},
266
(a, b) => a + b
267
)
268
269
// Collect neighbor names
270
val neighborNames = graph.aggregateMessages[Array[String]](
271
triplet => {
272
triplet.sendToSrc(Array(triplet.dstAttr))
273
triplet.sendToDst(Array(triplet.srcAttr))
274
},
275
(a, b) => a ++ b
276
)
277
```
278
279
## Types
280
281
### EdgeContext
282
283
Context object used in message aggregation providing access to edge and vertex data.
284
285
```scala { .api }
286
abstract class EdgeContext[VD, ED, A] {
287
/** Source vertex ID */
288
def srcId: VertexId
289
290
/** Destination vertex ID */
291
def dstId: VertexId
292
293
/** Source vertex attribute */
294
def srcAttr: VD
295
296
/** Destination vertex attribute */
297
def dstAttr: VD
298
299
/** Edge attribute */
300
def attr: ED
301
302
/** Send message to source vertex */
303
def sendToSrc(msg: A): Unit
304
305
/** Send message to destination vertex */
306
def sendToDst(msg: A): Unit
307
308
/** Convert to EdgeTriplet for compatibility */
309
def toEdgeTriplet: EdgeTriplet[VD, ED]
310
}
311
```