0
# Core Graph API
1
2
Fundamental graph construction, transformation, and analysis operations for building and manipulating distributed graph structures in GraphX.
3
4
## Capabilities
5
6
### Graph Construction
7
8
Create graphs from vertices and edges RDDs with full type safety and optimized partitioning.
9
10
```scala { .api }
11
/**
12
* Construct a graph from vertex and edge RDDs
13
* @param vertices RDD of (VertexId, VertexAttribute) pairs
14
* @param edges RDD of Edge objects with attributes
15
* @param defaultVertexAttr Default attribute for vertices not in vertices RDD
16
* @param edgeStorageLevel Storage level for edges
17
* @param vertexStorageLevel Storage level for vertices
18
* @returns New graph instance
19
*/
20
def Graph.apply[VD: ClassTag, ED: ClassTag](
21
vertices: RDD[(VertexId, VD)],
22
edges: RDD[Edge[ED]],
23
defaultVertexAttr: Option[VD] = None,
24
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
25
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
26
): Graph[VD, ED]
27
28
/**
29
* Construct a graph from just edges, creating vertices with default attributes
30
* @param edges RDD of edges
31
* @param defaultValue Default vertex attribute
32
* @param uniqueEdges Whether to combine duplicate edges
33
* @param edgeStorageLevel Storage level for edges
34
* @param vertexStorageLevel Storage level for vertices
35
* @returns New graph with inferred vertices
36
*/
37
def Graph.fromEdges[VD: ClassTag, ED: ClassTag](
38
edges: RDD[Edge[ED]],
39
defaultValue: VD,
40
uniqueEdges: Option[PartitionStrategy] = None,
41
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
42
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
43
): Graph[VD, ED]
44
45
/**
46
* Construct a graph from edge tuples with integer edge attributes
47
* @param rawEdges RDD of (srcId, dstId) tuples
48
* @param defaultValue Default vertex attribute
49
* @param uniqueEdges Whether to combine duplicate edges
50
* @param edgeStorageLevel Storage level for edges
51
* @param vertexStorageLevel Storage level for vertices
52
* @returns New graph with integer edge weights
53
*/
54
def Graph.fromEdgeTuples[VD: ClassTag](
55
rawEdges: RDD[(VertexId, VertexId)],
56
defaultValue: VD,
57
uniqueEdges: Option[PartitionStrategy] = None,
58
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
59
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
60
): Graph[VD, Int]
61
```
62
63
**Usage Examples:**
64
65
```scala
66
import org.apache.spark.graphx._
67
68
// Create from vertices and edges
69
val vertices = sc.parallelize(Array((1L, "Alice"), (2L, "Bob")))
70
val edges = sc.parallelize(Array(Edge(1L, 2L, "friend")))
71
val graph = Graph(vertices, edges)
72
73
// Create from edges only
74
val edges = sc.parallelize(Array(Edge(1L, 2L, 1.0), Edge(2L, 3L, 2.0)))
75
val graph = Graph.fromEdges(edges, defaultValue = "Unknown")
76
77
// Create from edge tuples
78
val edgeTuples = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L)))
79
val graph = Graph.fromEdgeTuples(edgeTuples, defaultValue = 0)
80
```
81
82
### Graph Properties
83
84
Access basic graph metrics and structure information.
85
86
```scala { .api }
87
abstract class Graph[VD: ClassTag, ED: ClassTag] {
88
/** RDD containing vertices and their attributes */
89
val vertices: VertexRDD[VD]
90
91
/** RDD containing edges and their attributes */
92
val edges: EdgeRDD[ED]
93
94
/** RDD of edge triplets with adjacent vertex attributes */
95
val triplets: RDD[EdgeTriplet[VD, ED]]
96
}
97
98
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
99
/** Total number of vertices in the graph */
100
def numVertices: Long
101
102
/** Total number of edges in the graph */
103
def numEdges: Long
104
105
/** In-degree of each vertex */
106
def inDegrees: VertexRDD[Int]
107
108
/** Out-degree of each vertex */
109
def outDegrees: VertexRDD[Int]
110
111
/** Total degree (in + out) of each vertex */
112
def degrees: VertexRDD[Int]
113
}
114
```
115
116
### Graph Transformations
117
118
Transform vertex and edge attributes while preserving graph structure.
119
120
```scala { .api }
121
/**
122
* Transform vertex attributes using a mapping function
123
* @param map Function transforming (VertexId, VertexAttribute) to new attribute
124
* @returns New graph with transformed vertex attributes
125
*/
126
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
127
128
/**
129
* Transform edge attributes using edge objects
130
* @param map Function transforming Edge to new edge attribute
131
* @returns New graph with transformed edge attributes
132
*/
133
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
134
135
/**
136
* Transform edge attributes using triplets (includes adjacent vertex data)
137
* @param map Function transforming EdgeTriplet to new edge attribute
138
* @returns New graph with transformed edge attributes
139
*/
140
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
141
142
/**
143
* Transform edge attributes using triplets with optimization hints
144
* @param map Function transforming EdgeTriplet to new edge attribute
145
* @param tripletFields Fields accessed by map function for optimization
146
* @returns New graph with transformed edge attributes
147
*/
148
def mapTriplets[ED2: ClassTag](
149
map: EdgeTriplet[VD, ED] => ED2,
150
tripletFields: TripletFields
151
): Graph[VD, ED2]
152
153
/**
154
* Reverse the direction of all edges
155
* @returns New graph with reversed edges
156
*/
157
def reverse: Graph[VD, ED]
158
```
159
160
**Usage Examples:**
161
162
```scala
163
// Transform vertex attributes
164
val upperGraph = graph.mapVertices((id, name) => name.toUpperCase)
165
166
// Transform edge attributes
167
val weightedGraph = graph.mapEdges(edge => edge.attr.length)
168
169
// Transform edges using adjacent vertex data
170
val labeledGraph = graph.mapTriplets(triplet =>
171
s"${triplet.srcAttr}->${triplet.dstAttr}")
172
173
// Reverse all edges
174
val reversedGraph = graph.reverse
175
```
176
177
### Graph Filtering and Subgraphs
178
179
Filter graphs by vertex and edge predicates to create subgraphs.
180
181
```scala { .api }
182
/**
183
* Filter graph by edge and vertex predicates
184
* @param epred Edge predicate function (EdgeTriplet => Boolean)
185
* @param vpred Vertex predicate function ((VertexId, VD) => Boolean)
186
* @returns Subgraph containing only vertices/edges satisfying predicates
187
*/
188
def subgraph(
189
epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
190
vpred: (VertexId, VD) => Boolean = ((v, d) => true)
191
): Graph[VD, ED]
192
193
/**
194
* Restrict graph to vertices and edges also present in another graph
195
* @param other Graph defining the mask
196
* @returns Intersection of current graph with other graph
197
*/
198
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
199
200
/**
201
* Filter graph with preprocessing step for optimization
202
* @param preprocess Function to preprocess graph before filtering
203
* @param epred Edge predicate
204
* @param vpred Vertex predicate
205
* @returns Filtered graph
206
*/
207
def filter[VD2: ClassTag, ED2: ClassTag](
208
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
209
epred: EdgeTriplet[VD2, ED2] => Boolean,
210
vpred: (VertexId, VD2) => Boolean
211
): Graph[VD, ED]
212
```
213
214
### Graph Joins and Aggregation
215
216
Join graphs with RDDs and perform message-passing aggregation operations.
217
218
```scala { .api }
219
/**
220
* Join vertices with an RDD, transforming vertex attributes
221
* @param table RDD of (VertexId, U) pairs to join
222
* @param mapFunc Function to combine vertex attribute and table value
223
* @returns New graph with joined vertex attributes
224
*/
225
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(
226
mapFunc: (VertexId, VD, U) => VD
227
): Graph[VD, ED]
228
229
/**
230
* Left outer join vertices with an RDD
231
* @param other RDD to join with
232
* @param mapFunc Function handling (VertexId, VD, Option[U])
233
* @returns New graph with joined attributes
234
*/
235
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
236
mapFunc: (VertexId, VD, Option[U]) => VD2
237
): Graph[VD2, ED]
238
239
/**
240
* Core message-passing aggregation API
241
* @param sendMsg Function defining messages sent along edges
242
* @param mergeMsg Function combining messages at vertices
243
* @param tripletFields Fields accessed for optimization
244
* @returns VertexRDD with aggregated messages
245
*/
246
def aggregateMessages[A: ClassTag](
247
sendMsg: EdgeContext[VD, ED, A] => Unit,
248
mergeMsg: (A, A) => A,
249
tripletFields: TripletFields = TripletFields.All
250
): VertexRDD[A]
251
```
252
253
**Usage Examples:**
254
255
```scala
256
// Join with user ages
257
val ages = sc.parallelize(Array((1L, 25), (2L, 30)))
258
val graphWithAges = graph.joinVertices(ages)((id, name, age) => (name, age))
259
260
// Subgraph filtering
261
val activeUsers = graph.subgraph(
262
vpred = (id, user) => user.active,
263
epred = triplet => triplet.attr == "friend"
264
)
265
266
// Message aggregation - compute in-degrees
267
val inDegrees = graph.aggregateMessages[Int](
268
sendMsg = ctx => ctx.sendToDst(1),
269
mergeMsg = (a, b) => a + b
270
)
271
```
272
273
### Graph Persistence and Checkpointing
274
275
Control caching, persistence, and fault tolerance for iterative graph algorithms.
276
277
```scala { .api }
278
/**
279
* Persist graph at specified storage level
280
* @param newLevel Storage level for vertices and edges
281
* @returns Graph with specified persistence level
282
*/
283
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
284
285
/**
286
* Cache graph at default storage level (MEMORY_ONLY)
287
* @returns Cached graph
288
*/
289
def cache(): Graph[VD, ED]
290
291
/**
292
* Mark graph for checkpointing to enable fault tolerance
293
*/
294
def checkpoint(): Unit
295
296
/**
297
* Remove graph from cache/persistence
298
* @param blocking Whether to block until unpersist is complete
299
* @returns Unpersisted graph
300
*/
301
def unpersist(blocking: Boolean = true): Graph[VD, ED]
302
303
/**
304
* Repartition edges using specified partitioning strategy
305
* @param partitionStrategy Strategy for distributing edges
306
* @returns Repartitioned graph
307
*/
308
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
309
```
310
311
## Core Data Types
312
313
```scala { .api }
314
/** 64-bit vertex identifier */
315
type VertexId = Long
316
317
/** Integer partition identifier (must be < 2^30) */
318
type PartitionID = Int
319
320
/**
321
* Directed edge with source, destination, and attribute
322
*/
323
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) {
324
/** Get the other vertex ID in this edge */
325
def otherVertexId(vid: VertexId): VertexId
326
327
/** Get edge direction relative to a vertex */
328
def relativeDirection(vid: VertexId): EdgeDirection
329
}
330
331
/**
332
* Edge with adjacent vertex attributes for message passing
333
*/
334
class EdgeTriplet[VD, ED] extends Edge[ED] {
335
/** Source vertex attribute */
336
val srcAttr: VD
337
338
/** Destination vertex attribute */
339
val dstAttr: VD
340
341
/** Get other vertex attribute */
342
def otherVertexAttr(vid: VertexId): VD
343
344
/** Get vertex attribute for specified vertex */
345
def vertexAttr(vid: VertexId): VD
346
347
/** Convert to tuple representation */
348
def toTuple: ((VertexId, VD), (VertexId, VD), ED)
349
}
350
351
/**
352
* Context for sending messages in aggregateMessages
353
*/
354
abstract class EdgeContext[VD, ED, A] {
355
val srcId: VertexId
356
val dstId: VertexId
357
val srcAttr: VD
358
val dstAttr: VD
359
val attr: ED
360
361
/** Send message to source vertex */
362
def sendToSrc(msg: A): Unit
363
364
/** Send message to destination vertex */
365
def sendToDst(msg: A): Unit
366
367
/** Convert to EdgeTriplet */
368
def toEdgeTriplet: EdgeTriplet[VD, ED]
369
}
370
```
371
372
### Specialized RDDs
373
374
```scala { .api }
375
/**
376
* Specialized RDD for vertices with efficient joins and indexing
377
*/
378
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
379
/** Reindex to contain only visible vertices */
380
def reindex(): VertexRDD[VD]
381
382
/** Transform vertex attributes */
383
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
384
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
385
386
/** Filter vertices by predicate */
387
def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]
388
389
/** Set difference with another RDD */
390
def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
391
392
/** Diff operation returning vertices that differ */
393
def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
394
395
/** Left join with another RDD */
396
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(
397
f: (VertexId, VD, Option[VD2]) => VD3
398
): VertexRDD[VD3]
399
400
/** Inner join with another RDD */
401
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
402
f: (VertexId, VD, U) => VD2
403
): VertexRDD[VD2]
404
}
405
406
/**
407
* Specialized RDD for edges with columnar storage
408
*/
409
abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
410
/** Transform edge attributes preserving structure */
411
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
412
413
/** Reverse all edges */
414
def reverse: EdgeRDD[ED]
415
416
/** Inner join with another EdgeRDD */
417
def innerJoin[ED2: ClassTag, ED3: ClassTag](other: EdgeRDD[ED2])(
418
f: (VertexId, VertexId, ED, ED2) => ED3
419
): EdgeRDD[ED3]
420
}
421
```