0
# Graph Algorithms
1
2
Comprehensive collection of pre-implemented graph algorithms including PageRank, Connected Components, Triangle Counting, and community detection algorithms optimized for distributed execution.
3
4
## Capabilities
5
6
### PageRank Algorithm
7
8
Compute PageRank scores using both static (fixed iterations) and dynamic (convergence-based) implementations.
9
10
```scala { .api }
11
/**
12
* Run PageRank until convergence
13
* @param tol Tolerance for convergence (change in rank below this stops)
14
* @param resetProb Probability of random jump (damping factor = 1 - resetProb)
15
* @returns Graph with PageRank scores as vertex attributes
16
*/
17
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
18
19
/**
20
* Run PageRank for fixed number of iterations
21
* @param numIter Number of iterations to run
22
* @param resetProb Probability of random jump
23
* @returns Graph with PageRank scores as vertex attributes
24
*/
25
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
26
27
/**
28
* Run personalized PageRank from a source vertex
29
* @param src Source vertex for personalized PageRank
30
* @param tol Tolerance for convergence
31
* @param resetProb Probability of jumping back to source
32
* @returns Graph with personalized PageRank scores
33
*/
34
def personalizedPageRank(
35
src: VertexId,
36
tol: Double,
37
resetProb: Double = 0.15
38
): Graph[Double, Double]
39
40
object PageRank {
41
/**
42
* Static PageRank implementation
43
* @param graph Input graph
44
* @param numIter Number of iterations
45
* @param resetProb Random jump probability
46
* @returns Graph with PageRank scores
47
*/
48
def run[VD: ClassTag, ED: ClassTag](
49
graph: Graph[VD, ED],
50
numIter: Int,
51
resetProb: Double = 0.15
52
): Graph[Double, Double]
53
54
/**
55
* Dynamic PageRank until convergence
56
* @param graph Input graph
57
* @param tol Convergence tolerance
58
* @param resetProb Random jump probability
59
* @returns Graph with PageRank scores
60
*/
61
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
62
graph: Graph[VD, ED],
63
tol: Double,
64
resetProb: Double = 0.15
65
): Graph[Double, Double]
66
67
/**
68
* PageRank with additional options
69
* @param graph Input graph
70
* @param numIter Number of iterations
71
* @param resetProb Random jump probability
72
* @param srcId Optional source vertex for personalized PageRank
73
* @returns Graph with PageRank scores
74
*/
75
def runWithOptions[VD: ClassTag, ED: ClassTag](
76
graph: Graph[VD, ED],
77
numIter: Int,
78
resetProb: Double,
79
srcId: Option[VertexId] = None
80
): Graph[Double, Double]
81
82
/**
83
* Parallel personalized PageRank for multiple sources
84
* @param graph Input graph
85
* @param numIter Number of iterations
86
* @param resetProb Random jump probability
87
* @param sources Set of source vertices
88
* @returns Graph with Vector of personalized PageRank scores
89
*/
90
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
91
graph: Graph[VD, ED],
92
numIter: Int,
93
resetProb: Double = 0.15,
94
sources: Array[VertexId]
95
): Graph[Vector, Double]
96
}
97
```
98
99
**Usage Examples:**
100
101
```scala
102
import org.apache.spark.graphx._
103
import org.apache.spark.graphx.lib._
104
105
// Dynamic PageRank until convergence
106
val ranks = graph.pageRank(0.0001).vertices
107
ranks.collect.foreach { case (id, rank) =>
108
println(s"Vertex $id has rank $rank")
109
}
110
111
// Static PageRank for 10 iterations
112
val staticRanks = graph.staticPageRank(10).vertices
113
114
// Personalized PageRank from vertex 1
115
val personalizedRanks = graph.personalizedPageRank(1L, 0.001).vertices
116
117
// Using PageRank object directly
118
val pageRankGraph = PageRank.run(graph, numIter = 20, resetProb = 0.1)
119
```
120
121
### Connected Components
122
123
Find connected components in undirected graphs using efficient label propagation.
124
125
```scala { .api }
126
/**
127
* Find connected components (assumes undirected graph)
128
* @returns Graph where each vertex has the smallest vertex ID in its component
129
*/
130
def connectedComponents(): Graph[VertexId, ED]
131
132
object ConnectedComponents {
133
/**
134
* Find connected components in a graph
135
* @param graph Input graph (treated as undirected)
136
* @returns Graph with component IDs as vertex attributes
137
*/
138
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
139
140
/**
141
* Find connected components with iteration limit
142
* @param graph Input graph
143
* @param maxIterations Maximum number of iterations
144
* @returns Graph with component IDs
145
*/
146
def run[VD: ClassTag, ED: ClassTag](
147
graph: Graph[VD, ED],
148
maxIterations: Int
149
): Graph[VertexId, ED]
150
}
151
```
152
153
### Strongly Connected Components
154
155
Find strongly connected components in directed graphs using iterative algorithms.
156
157
```scala { .api }
158
/**
159
* Compute strongly connected components
160
* @param numIter Number of iterations to run
161
* @returns Graph where each vertex has the smallest vertex ID in its SCC
162
*/
163
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
164
165
object StronglyConnectedComponents {
166
/**
167
* Find strongly connected components in directed graph
168
* @param graph Input directed graph
169
* @param numIter Number of iterations
170
* @returns Graph with SCC IDs as vertex attributes
171
*/
172
def run[VD: ClassTag, ED: ClassTag](
173
graph: Graph[VD, ED],
174
numIter: Int
175
): Graph[VertexId, ED]
176
}
177
```
178
179
### Triangle Counting
180
181
Count triangles in graphs for clustering coefficient computation and social network analysis.
182
183
```scala { .api }
184
/**
185
* Count triangles passing through each vertex
186
* @returns Graph with triangle counts as vertex attributes
187
*/
188
def triangleCount(): Graph[Int, ED]
189
190
object TriangleCount {
191
/**
192
* Count triangles (requires canonical edge orientation)
193
* @param graph Input graph
194
* @returns Graph with triangle counts per vertex
195
*/
196
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
197
198
/**
199
* Count triangles assuming graph is already in canonical form
200
* @param graph Pre-canonicalized graph
201
* @returns Graph with triangle counts
202
*/
203
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](
204
graph: Graph[VD, ED]
205
): Graph[Int, ED]
206
}
207
```
208
209
**Usage Examples:**
210
211
```scala
212
// Find connected components
213
val components = graph.connectedComponents().vertices
214
components.collect.foreach { case (id, component) =>
215
println(s"Vertex $id belongs to component $component")
216
}
217
218
// Count triangles
219
val triangles = graph.triangleCount().vertices
220
triangles.collect.foreach { case (id, count) =>
221
println(s"Vertex $id participates in $count triangles")
222
}
223
224
// Strongly connected components
225
val scc = graph.stronglyConnectedComponents(10).vertices
226
```
227
228
### Label Propagation
229
230
Community detection algorithm using iterative label propagation for clustering and community structure discovery.
231
232
```scala { .api }
233
object LabelPropagation {
234
/**
235
* Run label propagation algorithm for community detection
236
* @param graph Input graph
237
* @param maxSteps Maximum number of propagation steps
238
* @returns Graph with community labels as vertex attributes
239
*/
240
def run[VD: ClassTag, ED: ClassTag](
241
graph: Graph[VD, ED],
242
maxSteps: Int
243
): Graph[VertexId, ED]
244
}
245
```
246
247
**Usage Examples:**
248
249
```scala
250
import org.apache.spark.graphx.lib.LabelPropagation
251
252
// Community detection using label propagation
253
val communities = LabelPropagation.run(graph, maxSteps = 5).vertices
254
communities.collect.foreach { case (id, community) =>
255
println(s"Vertex $id belongs to community $community")
256
}
257
```
258
259
### Shortest Paths
260
261
Compute shortest paths from vertices to a set of landmark vertices using breadth-first search.
262
263
```scala { .api }
264
object ShortestPaths {
265
/** Map from landmark vertex ID to shortest distance */
266
type SPMap = Map[VertexId, Int]
267
268
/**
269
* Compute shortest paths to landmark vertices
270
* @param graph Input graph (edge weights ignored, all edges have weight 1)
271
* @param landmarks Set of landmark vertex IDs
272
* @returns Graph with shortest distance maps as vertex attributes
273
*/
274
def run[VD: ClassTag, ED: ClassTag](
275
graph: Graph[VD, ED],
276
landmarks: Seq[VertexId]
277
): Graph[SPMap, ED]
278
}
279
```
280
281
**Usage Examples:**
282
283
```scala
284
import org.apache.spark.graphx.lib.ShortestPaths
285
286
// Compute shortest paths to landmarks 1, 2, 3
287
val landmarks = Seq(1L, 2L, 3L)
288
val distances = ShortestPaths.run(graph, landmarks).vertices
289
290
distances.collect.foreach { case (id, distanceMap) =>
291
println(s"Vertex $id distances: $distanceMap")
292
}
293
```
294
295
### SVD++ Collaborative Filtering
296
297
Matrix factorization algorithm for recommendation systems and collaborative filtering.
298
299
```scala { .api }
300
object SVDPlusPlus {
301
/**
302
* Configuration for SVD++ algorithm
303
* @param rank Number of latent factors
304
* @param maxIters Maximum iterations
305
* @param minVal Minimum rating value
306
* @param maxVal Maximum rating value
307
* @param gamma1 Learning rate for user factors
308
* @param gamma2 Learning rate for item factors
309
* @param gamma6 Learning rate for user bias
310
* @param gamma7 Learning rate for item bias
311
*/
312
case class Conf(
313
rank: Int = 10,
314
maxIters: Int = 2,
315
minVal: Double = 0.0,
316
maxVal: Double = 5.0,
317
gamma1: Double = 0.007,
318
gamma2: Double = 0.007,
319
gamma6: Double = 0.005,
320
gamma7: Double = 0.015
321
)
322
323
/**
324
* Run SVD++ collaborative filtering
325
* @param edges RDD of rating edges (user -> item with rating)
326
* @param conf Algorithm configuration
327
* @returns Tuple of (trained model graph, training error)
328
*/
329
def run(
330
edges: RDD[Edge[Double]],
331
conf: Conf
332
): (Graph[(Array[Double], Array[Double], Double, Double), Double], Double)
333
}
334
```
335
336
**Usage Examples:**
337
338
```scala
339
import org.apache.spark.graphx.lib.SVDPlusPlus
340
341
// Prepare rating data as edges
342
val ratings = sc.parallelize(Array(
343
Edge(1L, 101L, 4.0), // User 1 rates item 101 as 4.0
344
Edge(1L, 102L, 2.0),
345
Edge(2L, 101L, 5.0)
346
))
347
348
// Configure SVD++
349
val conf = SVDPlusPlus.Conf(
350
rank = 10, // 10 latent factors
351
maxIters = 20, // 20 iterations
352
minVal = 1.0, // Min rating 1.0
353
maxVal = 5.0 // Max rating 5.0
354
)
355
356
// Train model
357
val (model, rmse) = SVDPlusPlus.run(ratings, conf)
358
println(f"Training RMSE: $rmse%.3f")
359
360
// Extract learned factors
361
val userFactors = model.vertices.filter(_._1 < 100).collect // Users have ID < 100
362
val itemFactors = model.vertices.filter(_._1 >= 100).collect // Items have ID >= 100
363
```
364
365
### Graph Utilities for Algorithms
366
367
Helper functions and operations commonly used with graph algorithms.
368
369
```scala { .api }
370
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
371
/**
372
* Collect neighbor vertex IDs for each vertex
373
* @param edgeDirection Direction of edges to consider
374
* @returns VertexRDD with arrays of neighbor IDs
375
*/
376
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
377
378
/**
379
* Collect neighbor vertices and their attributes
380
* @param edgeDirection Direction of edges to consider
381
* @returns VertexRDD with arrays of (VertexId, VertexAttribute) pairs
382
*/
383
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
384
385
/**
386
* Collect incident edges for each vertex
387
* @param edgeDirection Direction of edges to collect
388
* @returns VertexRDD with arrays of incident edges
389
*/
390
def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]]
391
392
/**
393
* Remove self-loops from the graph
394
* @returns Graph with self-loops removed
395
*/
396
def removeSelfEdges(): Graph[VD, ED]
397
398
/**
399
* Convert to canonical edge direction (srcId < dstId)
400
* @param mergeFunc Function to merge duplicate edges after canonicalization
401
* @returns Canonicalized graph
402
*/
403
def convertToCanonicalEdges(mergeFunc: (ED, ED) => ED): Graph[VD, ED]
404
405
/**
406
* Pick a random vertex from the graph
407
* @returns Random vertex ID
408
*/
409
def pickRandomVertex(): VertexId
410
}
411
```
412
413
## Algorithm Performance Tips
414
415
### PageRank Optimization
416
417
```scala
418
// For large graphs, use static PageRank with appropriate iterations
419
val ranks = graph
420
.partitionBy(PartitionStrategy.EdgePartition2D) // Optimize partitioning
421
.cache() // Cache for iterations
422
.staticPageRank(20)
423
424
// For personalized PageRank on multiple sources
425
val parallelRanks = PageRank.runParallelPersonalizedPageRank(
426
graph, numIter = 10, resetProb = 0.15, sources = Array(1L, 2L, 3L)
427
)
428
```
429
430
### Connected Components for Large Graphs
431
432
```scala
433
// Use iteration limit for very large graphs to avoid excessive computation
434
val components = ConnectedComponents.run(graph, maxIterations = 50)
435
436
// Pre-partition for better performance
437
val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)
438
val fasterComponents = partitionedGraph.connectedComponents()
439
```
440
441
### Triangle Counting Preparation
442
443
```scala
444
// Triangle counting requires canonical edges - GraphX will canonicalize automatically
445
// but you can pre-canonicalize for better performance
446
val canonicalGraph = graph.convertToCanonicalEdges((a, b) => a)
447
val triangles = TriangleCount.runPreCanonicalized(canonicalGraph)
448
449
// Remove self-loops before triangle counting for accuracy
450
val cleanGraph = graph.removeSelfEdges()
451
val triangleCounts = cleanGraph.triangleCount()
452
```
453
454
## Common Algorithm Patterns
455
456
### Iterative Convergence
457
458
```scala
459
// Pattern for implementing custom convergence-based algorithms
460
def iterativeAlgorithm[VD, ED](graph: Graph[VD, ED], tolerance: Double): Graph[VD, ED] = {
461
var g = graph.cache()
462
var converged = false
463
var iteration = 0
464
465
while (!converged && iteration < 100) {
466
val newG = g.pregel(/* pregel parameters */)
467
468
// Check convergence by comparing vertex attributes
469
val changes = g.vertices.join(newG.vertices).map {
470
case (id, (oldAttr, newAttr)) => math.abs(oldAttr - newAttr)
471
}.max()
472
473
converged = changes < tolerance
474
g.unpersist(blocking = false)
475
g = newG.cache()
476
iteration += 1
477
}
478
479
g
480
}
481
```
482
483
### Community Detection Pipeline
484
485
```scala
486
// Complete community detection and analysis pipeline
487
val graph = loadGraph()
488
489
// 1. Find communities using label propagation
490
val communities = LabelPropagation.run(graph, maxSteps = 10)
491
492
// 2. Analyze community structure
493
val communitySizes = communities.vertices
494
.map { case (id, community) => (community, 1) }
495
.reduceByKey(_ + _)
496
.collect()
497
498
// 3. Compute modularity within communities
499
val trianglesPerCommunity = communities
500
.subgraph(epred = triplet => triplet.srcAttr == triplet.dstAttr)
501
.triangleCount()
502
503
println(s"Found ${communitySizes.length} communities")
504
communitySizes.foreach { case (community, size) =>
505
println(s"Community $community has $size vertices")
506
}
507
```