0
# RDD Abstractions
1
2
Specialized RDD implementations optimized for graph operations with fast joins, efficient storage, and performance optimizations for vertex and edge data.
3
4
## Capabilities
5
6
### VertexRDD
7
8
Specialized RDD for vertex data that extends `RDD[(VertexId, VD)]` with optimizations for graph operations.
9
10
```scala { .api }
11
/**
12
* RDD of vertices with pre-indexing for fast joins
13
* Ensures only one entry per vertex ID
14
*/
15
abstract class VertexRDD[VD](sc: SparkContext, deps: Seq[Dependency[_]])
16
extends RDD[(VertexId, VD)](sc, deps) {
17
18
/** Reindex the RDD for optimal join performance */
19
def reindex(): VertexRDD[VD]
20
21
/** Transform vertex values while preserving indexing */
22
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
23
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
24
25
/** Remove vertices present in other RDD */
26
def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
27
28
/** Find vertices with different values in other RDD */
29
def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
30
}
31
```
32
33
**Usage Examples:**
34
35
```scala
36
import org.apache.spark.graphx._
37
38
// Create VertexRDD from regular RDD
39
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
40
(1L, "Alice"), (2L, "Bob"), (3L, "Charlie")
41
))
42
val vertexRDD = VertexRDD(vertices)
43
44
// Transform values efficiently
45
val upperCaseVertices = vertexRDD.mapValues(_.toUpperCase)
46
val prefixedVertices = vertexRDD.mapValues((id, name) => s"User_$id: $name")
47
48
// Set operations
49
val otherVertices: RDD[(VertexId, String)] = sc.parallelize(Array(
50
(2L, "Robert"), (4L, "David")
51
))
52
53
val uniqueVertices = vertexRDD.minus(otherVertices) // Vertices in first but not second
54
val changedVertices = vertexRDD.diff(otherVertices) // Vertices with different values
55
```
56
57
### VertexRDD Factory Methods
58
59
Static methods for creating VertexRDD instances from various sources.
60
61
```scala { .api }
62
object VertexRDD {
63
/**
64
* Create VertexRDD from regular RDD of vertex pairs
65
*/
66
def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD]
67
68
/**
69
* Create VertexRDD with default values for vertices implied by edges
70
* @param vertices Explicit vertex data
71
* @param edges Edge RDD to infer additional vertices from
72
* @param defaultVal Default value for vertices not in explicit data
73
*/
74
def apply[VD: ClassTag](
75
vertices: RDD[(VertexId, VD)],
76
edges: EdgeRDD[_],
77
defaultVal: VD
78
): VertexRDD[VD]
79
80
/**
81
* Create VertexRDD from edges only, using default value for all vertices
82
* @param edges Edge RDD to extract vertex IDs from
83
* @param numPartitions Number of partitions for the vertex RDD
84
* @param defaultVal Default value for all vertices
85
*/
86
def fromEdges[VD: ClassTag](
87
edges: EdgeRDD[_],
88
numPartitions: Int,
89
defaultVal: VD
90
): VertexRDD[VD]
91
}
92
```
93
94
### VertexRDD Join Operations
95
96
Optimized join operations that preserve VertexRDD indexing for performance.
97
98
```scala { .api }
99
/**
100
* Left zip join with another VertexRDD (faster than regular join)
101
* @param other Other VertexRDD to join with
102
* @param f Function to combine values (original, optional other)
103
*/
104
def leftZipJoin[VD2: ClassTag, VD3: ClassTag](other: VertexRDD[VD2])(
105
f: (VertexId, VD, Option[VD2]) => VD3
106
): VertexRDD[VD3]
107
108
/**
109
* Left join with regular RDD
110
* @param other Regular RDD to join with
111
* @param f Function to combine values
112
*/
113
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(
114
f: (VertexId, VD, Option[VD2]) => VD3
115
): VertexRDD[VD3]
116
117
/**
118
* Inner zip join with another VertexRDD (only matching keys)
119
* @param other Other VertexRDD to join with
120
* @param f Function to combine values
121
*/
122
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])(
123
f: (VertexId, VD, U) => VD2
124
): VertexRDD[VD2]
125
126
/**
127
* Inner join with regular RDD
128
* @param other Regular RDD to join with
129
* @param f Function to combine values
130
*/
131
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
132
f: (VertexId, VD, U) => VD2
133
): VertexRDD[VD2]
134
```
135
136
**Usage Examples:**
137
138
```scala
139
// Create two VertexRDDs for joining
140
val names = VertexRDD(sc.parallelize(Array(
141
(1L, "Alice"), (2L, "Bob"), (3L, "Charlie")
142
)))
143
144
val ages = VertexRDD(sc.parallelize(Array(
145
(1L, 25), (2L, 30) // Note: Charlie missing
146
)))
147
148
// Left zip join - all vertices from left RDD kept
149
val profiles = names.leftZipJoin(ages) { (id, name, ageOpt) =>
150
ageOpt match {
151
case Some(age) => s"$name, age $age"
152
case None => s"$name, age unknown"
153
}
154
}
155
156
// Inner zip join - only matching vertices
157
val knownAges = names.innerZipJoin(ages) { (id, name, age) =>
158
s"$name is $age years old"
159
}
160
161
// Join with regular RDD
162
val locations: RDD[(VertexId, String)] = sc.parallelize(Array(
163
(1L, "NYC"), (3L, "LA")
164
))
165
166
val fullProfiles = names.leftJoin(locations) { (id, name, locationOpt) =>
167
val location = locationOpt.getOrElse("Unknown location")
168
s"$name lives in $location"
169
}
170
```
171
172
### VertexRDD Message Aggregation
173
174
Specialized aggregation method for message passing operations.
175
176
```scala { .api }
177
/**
178
* Aggregate messages using vertex index for efficient routing
179
* @param messages RDD of messages sent to vertices
180
* @param reduceFunc Function to merge multiple messages to same vertex
181
*/
182
def aggregateUsingIndex[VD2: ClassTag](
183
messages: RDD[(VertexId, VD2)],
184
reduceFunc: (VD2, VD2) => VD2
185
): VertexRDD[VD2]
186
```
187
188
**Usage Examples:**
189
190
```scala
191
// Simulate message passing - multiple messages to same vertex
192
val messages: RDD[(VertexId, Int)] = sc.parallelize(Array(
193
(1L, 5), (1L, 3), (2L, 7), (1L, 2) // Multiple messages to vertex 1
194
))
195
196
// Aggregate messages by summing
197
val aggregatedMessages = vertexRDD.aggregateUsingIndex(messages, (a: Int, b: Int) => a + b)
198
199
// Result: vertex 1 gets 10 (5+3+2), vertex 2 gets 7
200
aggregatedMessages.collect().foreach { case (id, sum) =>
201
println(s"Vertex $id received total: $sum")
202
}
203
```
204
205
### EdgeRDD
206
207
Specialized RDD for edge data with columnar storage optimizations.
208
209
```scala { .api }
210
/**
211
* RDD of edges with optimized storage format
212
* Extends RDD[Edge[ED]] with graph-specific optimizations
213
*/
214
abstract class EdgeRDD[ED](sc: SparkContext, deps: Seq[Dependency[_]])
215
extends RDD[Edge[ED]](sc, deps) {
216
217
/** Transform edge attributes while preserving structure */
218
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
219
220
/** Reverse direction of all edges */
221
def reverse: EdgeRDD[ED]
222
223
/** Inner join with another EdgeRDD on (srcId, dstId) */
224
def innerJoin[ED2: ClassTag, ED3: ClassTag](other: EdgeRDD[ED2])(
225
f: (VertexId, VertexId, ED, ED2) => ED3
226
): EdgeRDD[ED3]
227
}
228
```
229
230
**Usage Examples:**
231
232
```scala
233
// Create EdgeRDD from regular RDD
234
val edges: RDD[Edge[String]] = sc.parallelize(Array(
235
Edge(1L, 2L, "follows"),
236
Edge(2L, 3L, "friend"),
237
Edge(3L, 1L, "colleague")
238
))
239
val edgeRDD = EdgeRDD.fromEdges(edges)
240
241
// Transform edge attributes
242
val weightedEdges = edgeRDD.mapValues(edge => edge.attr.length)
243
244
// Reverse all edges
245
val reversedEdges = edgeRDD.reverse
246
247
// Join with another EdgeRDD
248
val edgeWeights: RDD[Edge[Double]] = sc.parallelize(Array(
249
Edge(1L, 2L, 0.8), Edge(2L, 3L, 0.9)
250
))
251
val weightRDD = EdgeRDD.fromEdges(edgeWeights)
252
253
val joinedEdges = edgeRDD.innerJoin(weightRDD) { (src, dst, attr, weight) =>
254
s"$attr (weight: $weight)"
255
}
256
```
257
258
### EdgeRDD Factory Methods
259
260
Static methods for creating EdgeRDD instances.
261
262
```scala { .api }
263
object EdgeRDD {
264
/**
265
* Create EdgeRDD from regular RDD of edges
266
* @param edges RDD of Edge objects
267
*/
268
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD]
269
}
270
```
271
272
## Types
273
274
### Edge
275
276
Case class representing a directed edge with source, destination, and attribute.
277
278
```scala { .api }
279
/**
280
* Directed edge with source vertex, destination vertex, and edge attribute
281
* @param srcId Source vertex ID
282
* @param dstId Destination vertex ID
283
* @param attr Edge attribute of type ED
284
*/
285
case class Edge[ED](var srcId: VertexId, var dstId: VertexId, var attr: ED) {
286
/**
287
* Get the ID of the vertex on the other end of this edge
288
* @param vid Vertex ID to compare against
289
* @return The other vertex ID (dst if vid==src, src if vid==dst)
290
*/
291
def otherVertexId(vid: VertexId): VertexId
292
293
/**
294
* Get the direction of this edge relative to a vertex
295
* @param vid Vertex ID to check direction from
296
* @return EdgeDirection (In if vid is destination, Out if vid is source)
297
*/
298
def relativeDirection(vid: VertexId): EdgeDirection
299
}
300
```
301
302
### EdgeTriplet
303
304
Extended edge representation that includes adjacent vertex attributes.
305
306
```scala { .api }
307
/**
308
* Edge augmented with adjacent vertex attributes
309
* Extends Edge[ED] with source and destination vertex data
310
*/
311
class EdgeTriplet[VD, ED] extends Edge[ED] {
312
/** Source vertex attribute */
313
var srcAttr: VD
314
315
/** Destination vertex attribute */
316
var dstAttr: VD
317
318
/**
319
* Get attribute of vertex on other end of edge
320
* @param vid Vertex ID to compare against
321
* @return Attribute of the other vertex
322
*/
323
def otherVertexAttr(vid: VertexId): VD
324
325
/**
326
* Get attribute of specified vertex
327
* @param vid Vertex ID to get attribute for
328
* @return Vertex attribute
329
*/
330
def vertexAttr(vid: VertexId): VD
331
332
/**
333
* Convert to tuple format for compatibility
334
* @return ((srcId, srcAttr), (dstId, dstAttr), edgeAttr)
335
*/
336
def toTuple: ((VertexId, VD), (VertexId, VD), ED)
337
}
338
```
339
340
**Usage Examples:**
341
342
```scala
343
// Working with EdgeTriplet in practice
344
val graph = Graph(vertices, edges)
345
346
// Access triplets - edges with vertex data
347
graph.triplets.collect().foreach { triplet =>
348
println(s"${triplet.srcAttr} -[${triplet.attr}]-> ${triplet.dstAttr}")
349
350
// Use utility methods
351
val otherVertex = triplet.otherVertexId(triplet.srcId) // Returns dstId
352
val otherAttr = triplet.otherVertexAttr(triplet.srcId) // Returns dstAttr
353
354
// Convert to tuple if needed
355
val ((srcId, srcAttr), (dstId, dstAttr), edgeAttr) = triplet.toTuple
356
}
357
```
358
359
## Performance Considerations
360
361
### VertexRDD Optimizations
362
363
- **Pre-indexing**: VertexRDD maintains index structure for O(1) lookups during joins
364
- **Reindexing**: Call `reindex()` after major transformations to rebuild optimal index
365
- **Zip joins**: Use `leftZipJoin` and `innerZipJoin` instead of regular joins for better performance
366
- **Partitioning**: VertexRDD respects graph partitioning for locality
367
368
### EdgeRDD Optimizations
369
370
- **Columnar storage**: EdgeRDD stores edges in columnar format for cache efficiency
371
- **Partition co-location**: Edges are partitioned to minimize network communication
372
- **Bulk operations**: Prefer bulk transformations over individual edge operations
373
374
**Usage Examples:**
375
376
```scala
377
// Optimize VertexRDD performance
378
val optimizedVertices = vertexRDD
379
.mapValues(_.toUpperCase) // Transform values
380
.reindex() // Rebuild index for subsequent joins
381
382
// Efficient join pattern
383
val result = optimizedVertices.leftZipJoin(otherVertexRDD) { (id, a, bOpt) =>
384
// Fast join using pre-built indices
385
s"$a ${bOpt.getOrElse("N/A")}"
386
}
387
```