0
# Graph Algorithms Library
1
2
Pre-implemented graph algorithms for common analytics tasks including centrality measures, community detection, path finding, and structural analysis.
3
4
## Capabilities
5
6
### PageRank Algorithm
7
8
PageRank algorithm implementation with multiple variants for computing vertex importance scores.
9
10
```scala { .api }
11
/**
12
* PageRank algorithm implementations
13
*/
14
object PageRank {
15
/**
16
* Static PageRank with fixed number of iterations
17
* @param graph Input graph
18
* @param numIter Number of iterations to run
19
* @param resetProb Random reset probability (default 0.15)
20
* @return Graph with PageRank scores as vertex attributes
21
*/
22
def run[VD: ClassTag, ED: ClassTag](
23
graph: Graph[VD, ED],
24
numIter: Int,
25
resetProb: Double = 0.15
26
): Graph[Double, Double]
27
28
/**
29
* PageRank until convergence based on tolerance
30
* @param graph Input graph
31
* @param tol Convergence tolerance
32
* @param resetProb Random reset probability (default 0.15)
33
* @return Graph with PageRank scores
34
*/
35
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
36
graph: Graph[VD, ED],
37
tol: Double,
38
resetProb: Double = 0.15
39
): Graph[Double, Double]
40
41
/**
42
* PageRank with additional options including personalization
43
* @param graph Input graph
44
* @param numIter Number of iterations
45
* @param resetProb Random reset probability
46
* @param srcId Optional source vertex for personalized PageRank
47
* @return Graph with PageRank scores
48
*/
49
def runWithOptions[VD: ClassTag, ED: ClassTag](
50
graph: Graph[VD, ED],
51
numIter: Int,
52
resetProb: Double = 0.15,
53
srcId: Option[VertexId] = None
54
): Graph[Double, Double]
55
56
/**
57
* Parallel personalized PageRank from multiple sources
58
* @param graph Input graph
59
* @param numIter Number of iterations
60
* @param resetProb Random reset probability
61
* @param sources Array of source vertices
62
* @return Graph with vector of personalized scores
63
*/
64
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
65
graph: Graph[VD, ED],
66
numIter: Int,
67
resetProb: Double,
68
sources: Array[VertexId]
69
): Graph[Vector, Double]
70
71
/**
72
* Static personalized PageRank with fixed iterations from single source
73
* @param src Source vertex for personalization
74
*/
75
def staticPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
76
graph: Graph[VD, ED],
77
src: VertexId,
78
numIter: Int,
79
resetProb: Double = 0.15
80
): Graph[Double, Double]
81
82
/**
83
* Static parallel personalized PageRank from multiple sources with fixed iterations
84
* @param sources Array of source vertices
85
*/
86
def staticParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
87
graph: Graph[VD, ED],
88
sources: Array[VertexId],
89
numIter: Int,
90
resetProb: Double = 0.15
91
): Graph[Vector, Double]
92
}
93
```
94
95
**Usage Examples:**
96
97
```scala
98
import org.apache.spark.graphx._
99
import org.apache.spark.graphx.lib._
100
101
// Basic PageRank with fixed iterations
102
val pageRankGraph = PageRank.run(graph, numIter = 10)
103
val topVertices = pageRankGraph.vertices.top(5)(Ordering.by(_._2))
104
println("Top 5 vertices by PageRank:")
105
topVertices.foreach { case (id, rank) => println(s"Vertex $id: $rank") }
106
107
// PageRank until convergence
108
val convergedPR = PageRank.runUntilConvergence(graph, tol = 0.0001)
109
110
// Personalized PageRank from specific vertex
111
val personalizedPR = PageRank.runWithOptions(graph, numIter = 10, srcId = Some(1L))
112
113
// Parallel personalized PageRank from multiple sources
114
val sources = Array(1L, 2L, 3L)
115
val parallelPPR = PageRank.runParallelPersonalizedPageRank(graph, 10, 0.15, sources)
116
117
// Static personalized PageRank with fixed iterations
118
val staticPPR = PageRank.staticPersonalizedPageRank(graph, src = 1L, numIter = 5)
119
120
// Static parallel personalized PageRank with fixed iterations
121
val staticParallelPPR = PageRank.staticParallelPersonalizedPageRank(graph, sources, numIter = 5)
122
```
123
124
### Connected Components
125
126
Algorithm to find connected components in undirected graphs.
127
128
```scala { .api }
129
/**
130
* Connected components algorithm
131
*/
132
object ConnectedComponents {
133
/**
134
* Find connected components (runs until convergence)
135
* @param graph Input graph (treated as undirected)
136
* @return Graph where vertex values are component IDs
137
*/
138
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
139
140
/**
141
* Find connected components with iteration limit
142
* @param graph Input graph
143
* @param maxIterations Maximum number of iterations
144
* @return Graph with component IDs
145
*/
146
def run[VD: ClassTag, ED: ClassTag](
147
graph: Graph[VD, ED],
148
maxIterations: Int
149
): Graph[VertexId, ED]
150
}
151
```
152
153
**Usage Examples:**
154
155
```scala
156
// Find all connected components
157
val components = ConnectedComponents.run(graph)
158
159
// Analyze component sizes
160
val componentSizes = components.vertices
161
.map(_._2) // Get component IDs
162
.countByValue() // Count vertices per component
163
.toSeq.sortBy(-_._2) // Sort by size descending
164
165
println(s"Found ${componentSizes.size} connected components:")
166
componentSizes.take(5).foreach { case (componentId, size) =>
167
println(s"Component $componentId: $size vertices")
168
}
169
170
// Find vertices in largest component
171
val largestComponent = componentSizes.head._1
172
val largestComponentVertices = components.vertices
173
.filter(_._2 == largestComponent)
174
.keys.collect()
175
176
println(s"Largest component contains vertices: ${largestComponentVertices.mkString(", ")}")
177
178
// Connected components with iteration limit
179
val limitedComponents = ConnectedComponents.run(graph, maxIterations = 5)
180
```
181
182
### Triangle Counting
183
184
Algorithm to count triangles in the graph for clustering analysis.
185
186
```scala { .api }
187
/**
188
* Triangle counting algorithm
189
*/
190
object TriangleCount {
191
/**
192
* Count triangles passing through each vertex
193
* Graph must be partitioned with RandomVertexCut or CanonicalRandomVertexCut
194
* @param graph Input graph
195
* @return Graph where vertex values are triangle counts
196
*/
197
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
198
199
/**
200
* Count triangles on pre-canonicalized graph (edges in canonical form)
201
* More efficient if graph is already in canonical form
202
* @param graph Input graph with canonical edges
203
* @return Graph with triangle counts
204
*/
205
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
206
}
207
```
208
209
**Usage Examples:**
210
211
```scala
212
// Ensure proper partitioning for triangle counting
213
val partitionedGraph = graph.partitionBy(PartitionStrategy.RandomVertexCut)
214
215
// Count triangles
216
val triangleCounts = TriangleCount.run(partitionedGraph)
217
val totalTriangles = triangleCounts.vertices.map(_._2).sum() / 3 // Each triangle counted 3 times
218
219
println(s"Total triangles in graph: $totalTriangles")
220
221
// Find vertices with most triangles
222
val topTriangleVertices = triangleCounts.vertices.top(10)(Ordering.by(_._2))
223
println("Vertices with most triangles:")
224
topTriangleVertices.foreach { case (id, count) =>
225
println(s"Vertex $id: $count triangles")
226
}
227
228
// Calculate clustering coefficient
229
val degrees = graph.degrees
230
val clustering = triangleCounts.vertices.innerJoin(degrees) { (id, triangles, degree) =>
231
if (degree > 1) 2.0 * triangles / (degree * (degree - 1)) else 0.0
232
}
233
234
val avgClustering = clustering.map(_._2).mean()
235
println(s"Average clustering coefficient: $avgClustering")
236
```
237
238
### Strongly Connected Components
239
240
Algorithm to find strongly connected components in directed graphs.
241
242
```scala { .api }
243
/**
244
* Strongly connected components algorithm
245
*/
246
object StronglyConnectedComponents {
247
/**
248
* Find strongly connected components with fixed iterations
249
* @param graph Input directed graph
250
* @param numIter Number of iterations to run
251
* @return Graph where vertex values are SCC IDs
252
*/
253
def run[VD: ClassTag, ED: ClassTag](
254
graph: Graph[VD, ED],
255
numIter: Int
256
): Graph[VertexId, ED]
257
}
258
```
259
260
**Usage Examples:**
261
262
```scala
263
// Find strongly connected components
264
val sccGraph = StronglyConnectedComponents.run(graph, numIter = 10)
265
266
// Analyze SCC structure
267
val sccSizes = sccGraph.vertices
268
.map(_._2)
269
.countByValue()
270
.toSeq.sortBy(-_._2)
271
272
println(s"Found ${sccSizes.size} strongly connected components:")
273
sccSizes.take(5).foreach { case (sccId, size) =>
274
println(s"SCC $sccId: $size vertices")
275
}
276
277
// Find vertices in same SCC as a specific vertex
278
val targetVertex = 1L
279
val targetSCC = sccGraph.vertices.filter(_._1 == targetVertex).map(_._2).first()
280
val sameSCCVertices = sccGraph.vertices
281
.filter(_._2 == targetSCC)
282
.keys.collect()
283
284
println(s"Vertices in same SCC as vertex $targetVertex: ${sameSCCVertices.mkString(", ")}")
285
```
286
287
### Label Propagation
288
289
Community detection algorithm using label propagation.
290
291
```scala { .api }
292
/**
293
* Label propagation algorithm for community detection
294
*/
295
object LabelPropagation {
296
/**
297
* Run label propagation community detection
298
* @param graph Input graph (treated as undirected)
299
* @param maxSteps Maximum number of propagation steps
300
* @return Graph where vertex values are community labels
301
*/
302
def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VertexId, ED]
303
}
304
```
305
306
**Usage Examples:**
307
308
```scala
309
// Detect communities using label propagation
310
val communities = LabelPropagation.run(graph, maxSteps = 5)
311
312
// Analyze community structure
313
val communitySizes = communities.vertices
314
.map(_._2)
315
.countByValue()
316
.toSeq.sortBy(-_._2)
317
318
println(s"Detected ${communitySizes.size} communities:")
319
communitySizes.take(10).foreach { case (communityId, size) =>
320
println(s"Community $communityId: $size vertices")
321
}
322
323
// Calculate modularity or other community quality measures
324
val communityMemberships = communities.vertices.collectAsMap()
325
```
326
327
### Shortest Paths
328
329
Single-source shortest paths to landmark vertices.
330
331
```scala { .api }
332
/**
333
* Shortest paths algorithm
334
*/
335
object ShortestPaths {
336
/** Type alias for shortest path maps */
337
type SPMap = Map[VertexId, Int]
338
339
/**
340
* Single-source shortest paths to specified landmark vertices
341
* @param graph Input graph (edge weights assumed to be 1)
342
* @param landmarks Set of landmark vertices to find paths to
343
* @return Graph where vertices contain maps of distances to landmarks
344
*/
345
def run[VD, ED: ClassTag](
346
graph: Graph[VD, ED],
347
landmarks: Seq[VertexId]
348
): Graph[SPMap, ED]
349
}
350
```
351
352
**Usage Examples:**
353
354
```scala
355
// Find shortest paths to landmark vertices
356
val landmarks = Seq(1L, 5L, 10L)
357
val shortestPaths = ShortestPaths.run(graph, landmarks)
358
359
// Extract distances to landmarks
360
shortestPaths.vertices.collect().foreach { case (vertexId, pathMap) =>
361
println(s"Vertex $vertexId distances:")
362
pathMap.foreach { case (landmark, distance) =>
363
println(s" To landmark $landmark: $distance hops")
364
}
365
}
366
367
// Find vertices closest to specific landmark
368
val targetLandmark = 1L
369
val closestToLandmark = shortestPaths.vertices
370
.map { case (id, pathMap) => (id, pathMap.getOrElse(targetLandmark, Int.MaxValue)) }
371
.filter(_._2 < Int.MaxValue)
372
.takeOrdered(5)(Ordering.by(_._2))
373
374
println(s"Closest vertices to landmark $targetLandmark:")
375
closestToLandmark.foreach { case (id, distance) =>
376
println(s"Vertex $id: distance $distance")
377
}
378
```
379
380
### SVD++
381
382
Collaborative filtering algorithm implementation.
383
384
```scala { .api }
385
/**
386
* SVD++ algorithm for collaborative filtering and matrix factorization
387
* Implementation based on "Factorization Meets the Neighborhood" paper
388
* Typically used on bipartite graphs (users-items)
389
*/
390
object SVDPlusPlus {
391
/**
392
* Configuration parameters for SVD++ algorithm
393
*/
394
class Conf(
395
var rank: Int, // Number of latent factors
396
var maxIters: Int, // Maximum iterations
397
var minVal: Double, // Minimum rating value
398
var maxVal: Double, // Maximum rating value
399
var gamma1: Double, // Learning rate for user factors
400
var gamma2: Double, // Learning rate for item factors
401
var gamma6: Double, // Learning rate for user bias
402
var gamma7: Double // Learning rate for item bias
403
) extends Serializable
404
405
/**
406
* Run SVD++ algorithm on rating graph
407
* @param edges Rating edges with Double attributes
408
* @param conf Algorithm configuration parameters
409
* @return Tuple of (trained model graph, final RMSE)
410
*/
411
def run(edges: RDD[Edge[Double]], conf: Conf):
412
(Graph[(Array[Double], Array[Double], Double, Double), Double], Double)
413
}
414
```
415
416
## Algorithm Integration Patterns
417
418
### Combining Multiple Algorithms
419
420
```scala
421
// Example: Comprehensive graph analysis pipeline
422
val graph = loadGraph()
423
424
// 1. Basic metrics
425
val metrics = (graph.numVertices, graph.numEdges,
426
graph.degrees.map(_._2).mean())
427
println(s"Graph: ${metrics._1} vertices, ${metrics._2} edges, avg degree ${metrics._3}")
428
429
// 2. Structural analysis
430
val components = ConnectedComponents.run(graph)
431
val triangles = TriangleCount.run(graph.partitionBy(PartitionStrategy.RandomVertexCut))
432
433
// 3. Centrality analysis
434
val pagerank = PageRank.runUntilConvergence(graph, 0.0001)
435
val centralVertices = pagerank.vertices.top(10)(Ordering.by(_._2))
436
437
// 4. Community detection
438
val communities = LabelPropagation.run(graph, 10)
439
440
// 5. Path analysis
441
val landmarks = centralVertices.take(3).map(_._1)
442
val paths = ShortestPaths.run(graph, landmarks)
443
444
// Combine results for comprehensive analysis
445
val analysis = graph.vertices.innerJoin(pagerank.vertices) { (id, orig, pr) =>
446
(orig, pr)
447
}.innerJoin(components.vertices) { (id, (orig, pr), comp) =>
448
(orig, pr, comp)
449
}.innerJoin(communities.vertices) { (id, (orig, pr, comp), comm) =>
450
(orig, pr, comp, comm)
451
}
452
```
453
454
### Custom Algorithm Implementation
455
456
```scala
457
// Template for implementing custom algorithms using existing primitives
458
def myCustomAlgorithm[VD: ClassTag, ED: ClassTag](
459
graph: Graph[VD, ED],
460
maxIter: Int
461
): Graph[Double, ED] = {
462
463
// Initialize vertex values
464
var g = graph.mapVertices((id, attr) => 0.0)
465
466
// Iterative computation
467
for (i <- 0 until maxIter) {
468
// Use aggregateMessages for computation
469
val newValues = g.aggregateMessages[Double](
470
triplet => {
471
// Send messages based on current state
472
triplet.sendToSrc(triplet.dstAttr + 1.0)
473
triplet.sendToDst(triplet.srcAttr + 1.0)
474
},
475
(a, b) => a + b // Merge messages
476
)
477
478
// Update vertex values
479
g = g.outerJoinVertices(newValues) { (id, oldValue, msgOpt) =>
480
msgOpt.getOrElse(oldValue)
481
}
482
}
483
484
g
485
}
486
```
487
488
## Performance Considerations
489
490
### Algorithm-Specific Optimizations
491
492
- **PageRank**: Use `runUntilConvergence` for accuracy, `run` with fixed iterations for predictable runtime
493
- **Triangle Counting**: Requires proper partitioning (`RandomVertexCut` or `CanonicalRandomVertexCut`)
494
- **Connected Components**: May require many iterations on large graphs with long paths
495
- **Label Propagation**: Fast but non-deterministic; results may vary between runs
496
- **Shortest Paths**: Memory usage grows with number of landmarks
497
498
### General Algorithm Guidelines
499
500
```scala
501
// Optimize graph before running algorithms
502
val optimizedGraph = graph
503
.partitionBy(PartitionStrategy.EdgePartition2D) // Good general partitioning
504
.cache() // Cache for repeated use
505
506
// Run multiple algorithms on same cached graph
507
val pagerank = PageRank.run(optimizedGraph, 10)
508
val components = ConnectedComponents.run(optimizedGraph)
509
val triangles = TriangleCount.run(optimizedGraph)
510
```