Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

graphx.md docs/

1
# GraphX - Graph Processing
2
3
GraphX is Apache Spark's API for graphs and graph-parallel computation. It extends the Spark RDD abstraction with a Resilient Distributed Property Graph: a directed multigraph with properties attached to each vertex and edge.
4
5
## Core Graph Abstractions
6
7
### Graph Class
8
9
The central abstraction in GraphX:
10
11
```scala { .api }
12
abstract class Graph[VD: ClassTag, ED: ClassTag] extends Serializable {
13
// Core properties
14
def vertices: VertexRDD[VD]
15
def edges: EdgeRDD[ED]
16
def triplets: RDD[EdgeTriplet[VD, ED]]
17
18
// Structural operations
19
def reverse: Graph[VD, ED]
20
def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = x => true, vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED]
21
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
22
23
// Transformation operations
24
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
25
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
26
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
27
}
28
```
29
30
### VertexId Type
31
32
```scala { .api }
33
type VertexId = Long
34
```
35
36
### Edge Class
37
38
Represents a directed edge in the graph:
39
40
```scala { .api }
41
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) extends Serializable
42
```
43
44
### EdgeTriplet Class
45
46
Joins vertex and edge data:
47
48
```scala { .api }
49
class EdgeTriplet[VD, ED] extends Edge[ED] {
50
def srcId: VertexId // Source vertex ID
51
def dstId: VertexId // Destination vertex ID
52
def attr: ED // Edge attribute
53
def srcAttr: VD // Source vertex attribute
54
def dstAttr: VD // Destination vertex attribute
55
}
56
```
57
58
## Creating Graphs
59
60
### Graph Construction
61
62
**Graph.apply**: Create graph from vertices and edges
63
```scala { .api }
64
object Graph {
65
def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null): Graph[VD, ED]
66
}
67
```
68
69
```scala
70
import org.apache.spark.graphx._
71
import org.apache.spark.rdd.RDD
72
73
// Create vertices RDD
74
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
75
(1L, "Alice"),
76
(2L, "Bob"),
77
(3L, "Charlie"),
78
(4L, "David")
79
))
80
81
// Create edges RDD
82
val edges: RDD[Edge[String]] = sc.parallelize(Array(
83
Edge(1L, 2L, "friend"),
84
Edge(2L, 3L, "friend"),
85
Edge(3L, 4L, "colleague"),
86
Edge(1L, 4L, "colleague")
87
))
88
89
// Create graph
90
val graph = Graph(vertices, edges, defaultVertexAttr = "Unknown")
91
```
92
93
**Graph.fromEdges**: Create graph from edges only
94
```scala { .api }
95
def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD): Graph[VD, ED]
96
```
97
98
```scala
99
// Create graph from edges (vertices inferred)
100
val relationships: RDD[Edge[String]] = sc.parallelize(Array(
101
Edge(1L, 2L, "follows"),
102
Edge(2L, 3L, "follows"),
103
Edge(3L, 1L, "follows")
104
))
105
106
val socialGraph = Graph.fromEdges(relationships, defaultValue = "user")
107
```
108
109
**Graph.fromEdgeTuples**: Create unweighted graph from tuples
110
```scala { .api }
111
def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
112
```
113
114
```scala
115
// Simple edge list as tuples
116
val edgeTuples: RDD[(VertexId, VertexId)] = sc.parallelize(Array(
117
(1L, 2L), (2L, 3L), (3L, 1L), (1L, 3L)
118
))
119
120
val simpleGraph = Graph.fromEdgeTuples(edgeTuples, defaultValue = 1)
121
```
122
123
## Graph Properties and Operations
124
125
### Basic Properties
126
127
```scala
128
val numVertices = graph.vertices.count()
129
val numEdges = graph.edges.count()
130
131
println(s"Graph has $numVertices vertices and $numEdges edges")
132
133
// Access vertices and edges
134
graph.vertices.collect().foreach { case (id, attr) =>
135
println(s"Vertex $id: $attr")
136
}
137
138
graph.edges.collect().foreach { edge =>
139
println(s"Edge ${edge.srcId} -> ${edge.dstId}: ${edge.attr}")
140
}
141
142
// Access triplets (vertex-edge-vertex)
143
graph.triplets.collect().foreach { triplet =>
144
println(s"${triplet.srcAttr} -${triplet.attr}-> ${triplet.dstAttr}")
145
}
146
```
147
148
### Graph Transformations
149
150
**mapVertices**: Transform vertex attributes
151
```scala { .api }
152
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
153
```
154
155
```scala
156
// Add vertex degrees to attributes
157
val graphWithDegrees = graph.mapVertices { (id, attr) =>
158
(attr, graph.degrees.lookup(id).headOption.getOrElse(0))
159
}
160
161
// Convert to upper case
162
val upperCaseGraph = graph.mapVertices { (id, name) =>
163
name.toUpperCase
164
}
165
```
166
167
**mapEdges**: Transform edge attributes
168
```scala { .api }
169
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
170
def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
171
```
172
173
```scala
174
// Add edge weights
175
val weightedGraph = graph.mapEdges { edge =>
176
edge.attr match {
177
case "friend" => 1.0
178
case "colleague" => 0.5
179
case _ => 0.1
180
}
181
}
182
183
// Transform edge attributes using triplet info
184
val enhancedGraph = graph.mapTriplets { triplet =>
185
s"${triplet.srcAttr}-${triplet.attr}-${triplet.dstAttr}"
186
}
187
```
188
189
### Structural Operations
190
191
**reverse**: Reverse edge directions
192
```scala { .api }
193
def reverse: Graph[VD, ED]
194
```
195
196
**subgraph**: Extract subgraph based on predicates
197
```scala { .api }
198
def subgraph(
199
epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
200
vpred: (VertexId, VD) => Boolean = ((v, d) => true)
201
): Graph[VD, ED]
202
```
203
204
```scala
205
// Reverse all edges
206
val reversedGraph = graph.reverse
207
208
// Extract subgraph with only "friend" edges
209
val friendGraph = graph.subgraph(epred = _.attr == "friend")
210
211
// Extract subgraph with specific vertices
212
val aliceBobGraph = graph.subgraph(
213
vpred = (id, attr) => attr == "Alice" || attr == "Bob"
214
)
215
216
// Extract subgraph based on both vertices and edges
217
val specificSubgraph = graph.subgraph(
218
epred = triplet => triplet.srcAttr != "Charlie",
219
vpred = (id, attr) => attr.length > 3
220
)
221
```
222
223
**groupEdges**: Merge parallel edges
224
```scala { .api }
225
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
226
```
227
228
```scala
229
// Create graph with parallel edges
230
val parallelEdges: RDD[Edge[Int]] = sc.parallelize(Array(
231
Edge(1L, 2L, 1),
232
Edge(1L, 2L, 2), // Parallel edge
233
Edge(2L, 3L, 3)
234
))
235
236
val parallelGraph = Graph.fromEdges(parallelEdges, "user")
237
238
// Merge parallel edges by summing weights
239
val mergedGraph = parallelGraph.groupEdges(_ + _)
240
```
241
242
## VertexRDD and EdgeRDD
243
244
### VertexRDD
245
246
Specialized RDD for vertices with efficient joins:
247
248
```scala { .api }
249
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
250
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
251
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
252
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
253
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
254
def aggregateUsingIndex[VD2: ClassTag](messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
255
}
256
```
257
258
```scala
259
val degrees = graph.degrees
260
261
// Transform vertex values
262
val transformedVertices = graph.vertices.mapValues(_.toUpperCase)
263
264
// Join with additional data
265
val ages: RDD[(VertexId, Int)] = sc.parallelize(Array(
266
(1L, 25), (2L, 30), (3L, 35), (4L, 28)
267
))
268
269
val verticesWithAges = graph.vertices.leftJoin(ages) { (id, name, ageOpt) =>
270
(name, ageOpt.getOrElse(0))
271
}
272
```
273
274
### EdgeRDD
275
276
Specialized RDD for edges:
277
278
```scala { .api }
279
abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
280
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
281
def reverse: EdgeRDD[ED]
282
def innerJoin[ED2: ClassTag, ED3: ClassTag](other: RDD[(VertexId, ED2)])(f: (VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
283
}
284
```
285
286
## GraphOps - Advanced Operations
287
288
GraphOps provides additional graph algorithms and utilities through implicit conversion:
289
290
```scala { .api }
291
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
292
// Degree operations
293
def degrees: VertexRDD[Int]
294
def inDegrees: VertexRDD[Int]
295
def outDegrees: VertexRDD[Int]
296
297
// Neighborhood operations
298
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
299
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
300
301
// Messaging operations
302
def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
303
304
// Pregel API
305
def pregel[A: ClassTag](initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A): Graph[VD, ED]
306
}
307
```
308
309
### Degree Operations
310
311
```scala
312
import org.apache.spark.graphx.GraphOps
313
314
// Compute vertex degrees
315
val degrees = graph.degrees
316
val inDegrees = graph.inDegrees
317
val outDegrees = graph.outDegrees
318
319
// Find vertices with highest in-degree
320
val maxInDegree = inDegrees.reduce { (a, b) =>
321
if (a._2 > b._2) a else b
322
}
323
println(s"Vertex ${maxInDegree._1} has highest in-degree: ${maxInDegree._2}")
324
325
// Join degrees with vertex attributes
326
val verticesWithDegrees = graph.vertices.leftJoin(degrees) { (id, attr, deg) =>
327
(attr, deg.getOrElse(0))
328
}
329
```
330
331
### Neighborhood Operations
332
333
```scala
334
import org.apache.spark.graphx.EdgeDirection
335
336
// Collect neighbors
337
val neighbors = graph.collectNeighbors(EdgeDirection.Out)
338
neighbors.collect().foreach { case (id, neighborArray) =>
339
println(s"Vertex $id has neighbors: ${neighborArray.mkString(", ")}")
340
}
341
342
// Collect neighbor IDs only
343
val neighborIds = graph.collectNeighborIds(EdgeDirection.In)
344
```
345
346
### Message Passing with aggregateMessages
347
348
```scala { .api }
349
def aggregateMessages[A: ClassTag](
350
sendMsg: EdgeContext[VD, ED, A] => Unit,
351
mergeMsg: (A, A) => A,
352
tripletFields: TripletFields = TripletFields.All
353
): VertexRDD[A]
354
```
355
356
```scala
357
import org.apache.spark.graphx.{EdgeContext, TripletFields}
358
359
// Count neighbors
360
val neighborCount = graph.aggregateMessages[Int](
361
// Send message to each vertex
362
sendMsg = { edgeContext =>
363
edgeContext.sendToSrc(1)
364
edgeContext.sendToDst(1)
365
},
366
// Merge messages
367
mergeMsg = _ + _
368
)
369
370
// Compute average neighbor age (assuming vertices have age attribute)
371
val ageGraph: Graph[Int, String] = Graph.fromEdges(edges, defaultValue = 25)
372
373
val avgNeighborAge = ageGraph.aggregateMessages[Double](
374
sendMsg = { ctx =>
375
ctx.sendToSrc(ctx.dstAttr.toDouble)
376
ctx.sendToDst(ctx.srcAttr.toDouble)
377
},
378
mergeMsg = _ + _,
379
tripletFields = TripletFields.All
380
).mapValues { (id, totalAge) =>
381
val degree = ageGraph.degrees.lookup(id).head
382
totalAge / degree
383
}
384
```
385
386
### Pregel API
387
388
The Pregel API enables iterative graph computations:
389
390
```scala { .api }
391
def pregel[A: ClassTag](
392
initialMsg: A,
393
maxIterations: Int = Int.MaxValue,
394
activeDirection: EdgeDirection = EdgeDirection.Either
395
)(
396
vprog: (VertexId, VD, A) => VD,
397
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
398
mergeMsg: (A, A) => A
399
): Graph[VD, ED]
400
```
401
402
```scala
403
import org.apache.spark.graphx.EdgeDirection
404
405
// Single Source Shortest Path using Pregel
406
def shortestPath(graph: Graph[Double, Double], sourceId: VertexId): Graph[Double, Double] = {
407
// Initialize distances (source = 0.0, others = Double.PositiveInfinity)
408
val initialGraph = graph.mapVertices { (id, _) =>
409
if (id == sourceId) 0.0 else Double.PositiveInfinity
410
}
411
412
initialGraph.pregel(
413
initialMsg = Double.PositiveInfinity,
414
maxIterations = Int.MaxValue,
415
activeDirection = EdgeDirection.Out
416
)(
417
// Vertex program: update distance if received shorter path
418
vprog = { (id, dist, newDist) => math.min(dist, newDist) },
419
420
// Send message: if vertex distance changed, notify neighbors
421
sendMsg = { triplet =>
422
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
423
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
424
} else {
425
Iterator.empty
426
}
427
},
428
429
// Merge messages: take minimum distance
430
mergeMsg = (a, b) => math.min(a, b)
431
)
432
}
433
434
// Usage
435
val sourceVertex = 1L
436
val distances = shortestPath(weightedGraph, sourceVertex)
437
```
438
439
## Built-in Graph Algorithms
440
441
GraphX includes implementations of common graph algorithms:
442
443
### PageRank
444
445
```scala { .api }
446
object PageRank {
447
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
448
def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
449
}
450
```
451
452
```scala
453
import org.apache.spark.graphx.lib.PageRank
454
455
// Run PageRank for fixed iterations
456
val pageRanks = PageRank.run(graph, numIter = 10)
457
458
// Run PageRank until convergence
459
val convergedRanks = PageRank.runUntilConvergence(graph, tol = 0.0001)
460
461
// Get vertices with highest PageRank
462
val topVertices = pageRanks.vertices.top(3)(Ordering.by(_._2))
463
topVertices.foreach { case (id, rank) =>
464
println(s"Vertex $id: PageRank = $rank")
465
}
466
```
467
468
### Connected Components
469
470
```scala { .api }
471
object ConnectedComponents {
472
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
473
}
474
```
475
476
```scala
477
import org.apache.spark.graphx.lib.ConnectedComponents
478
479
val ccGraph = ConnectedComponents.run(graph)
480
481
// Group vertices by connected component
482
val componentSizes = ccGraph.vertices
483
.map(_._2) // Extract component ID
484
.countByValue() // Count vertices per component
485
486
componentSizes.foreach { case (componentId, size) =>
487
println(s"Component $componentId has $size vertices")
488
}
489
```
490
491
### Triangle Counting
492
493
```scala { .api }
494
object TriangleCount {
495
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
496
}
497
```
498
499
```scala
500
import org.apache.spark.graphx.lib.TriangleCount
501
502
// Count triangles (graph must be canonical - lower vertex ID as source)
503
val canonicalGraph = graph.convertToCanonicalEdges()
504
val triangleCounts = TriangleCount.run(canonicalGraph)
505
506
// Find vertices involved in most triangles
507
val maxTriangles = triangleCounts.vertices.reduce { (a, b) =>
508
if (a._2 > b._2) a else b
509
}
510
println(s"Vertex ${maxTriangles._1} is in ${maxTriangles._2} triangles")
511
```
512
513
### Strongly Connected Components
514
515
```scala { .api }
516
object StronglyConnectedComponents {
517
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]
518
}
519
```
520
521
```scala
522
import org.apache.spark.graphx.lib.StronglyConnectedComponents
523
524
val sccGraph = StronglyConnectedComponents.run(graph, numIter = 10)
525
526
// Find strongly connected components
527
val sccSizes = sccGraph.vertices
528
.map(_._2)
529
.countByValue()
530
531
println(s"Found ${sccSizes.size} strongly connected components")
532
```
533
534
## Graph Partitioning
535
536
Control how graphs are partitioned across the cluster:
537
538
```scala { .api }
539
object PartitionStrategy {
540
val EdgePartition1D: PartitionStrategy
541
val EdgePartition2D: PartitionStrategy
542
val RandomVertexCut: PartitionStrategy
543
val CanonicalRandomVertexCut: PartitionStrategy
544
}
545
```
546
547
```scala
548
import org.apache.spark.graphx.{PartitionStrategy, Graph}
549
550
// Create graph with specific partitioning strategy
551
val partitionedGraph = Graph(vertices, edges)
552
.partitionBy(PartitionStrategy.EdgePartition2D, 4)
553
554
// Repartition existing graph
555
val repartitionedGraph = graph.partitionBy(PartitionStrategy.RandomVertexCut, 8)
556
```
557
558
## Performance Optimization
559
560
### Graph Caching
561
562
```scala
563
// Cache graph for iterative algorithms
564
val cachedGraph = graph.cache()
565
566
// Unpersist when done
567
cachedGraph.unpersist()
568
```
569
570
### Efficient Graph Construction
571
572
```scala
573
// For large graphs, construct more efficiently
574
val efficientGraph = Graph.fromEdges(edges, defaultVertexAttr = "default")
575
.partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 4)
576
.cache()
577
578
// Materialize the graph
579
efficientGraph.vertices.count()
580
efficientGraph.edges.count()
581
```
582
583
This comprehensive guide covers the complete GraphX API for building scalable graph processing applications in Apache Spark.