Comprehensive collection of pre-implemented graph algorithms including PageRank, Connected Components, Triangle Counting, and community detection algorithms optimized for distributed execution.
Compute PageRank scores using both static (fixed iterations) and dynamic (convergence-based) implementations.
/**
* Run PageRank until convergence
* @param tol Tolerance for convergence (change in rank below this stops)
* @param resetProb Probability of random jump (damping factor = 1 - resetProb)
* @returns Graph with PageRank scores as vertex attributes
*/
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
/**
* Run PageRank for fixed number of iterations
* @param numIter Number of iterations to run
* @param resetProb Probability of random jump
* @returns Graph with PageRank scores as vertex attributes
*/
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
/**
* Run personalized PageRank from a source vertex
* @param src Source vertex for personalized PageRank
* @param tol Tolerance for convergence
* @param resetProb Probability of jumping back to source
* @returns Graph with personalized PageRank scores
*/
def personalizedPageRank(
src: VertexId,
tol: Double,
resetProb: Double = 0.15
): Graph[Double, Double]
object PageRank {
/**
* Static PageRank implementation
* @param graph Input graph
* @param numIter Number of iterations
* @param resetProb Random jump probability
* @returns Graph with PageRank scores
*/
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
numIter: Int,
resetProb: Double = 0.15
): Graph[Double, Double]
/**
* Dynamic PageRank until convergence
* @param graph Input graph
* @param tol Convergence tolerance
* @param resetProb Random jump probability
* @returns Graph with PageRank scores
*/
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
tol: Double,
resetProb: Double = 0.15
): Graph[Double, Double]
/**
* PageRank with additional options
* @param graph Input graph
* @param numIter Number of iterations
* @param resetProb Random jump probability
* @param srcId Optional source vertex for personalized PageRank
* @returns Graph with PageRank scores
*/
def runWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
numIter: Int,
resetProb: Double,
srcId: Option[VertexId] = None
): Graph[Double, Double]
/**
* Parallel personalized PageRank for multiple sources
* @param graph Input graph
* @param numIter Number of iterations
* @param resetProb Random jump probability
* @param sources Set of source vertices
* @returns Graph with Vector of personalized PageRank scores
*/
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
numIter: Int,
resetProb: Double = 0.15,
sources: Array[VertexId]
): Graph[Vector, Double]
}Usage Examples:
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
// Dynamic PageRank until convergence
val ranks = graph.pageRank(0.0001).vertices
ranks.collect.foreach { case (id, rank) =>
println(s"Vertex $id has rank $rank")
}
// Static PageRank for 10 iterations
val staticRanks = graph.staticPageRank(10).vertices
// Personalized PageRank from vertex 1
val personalizedRanks = graph.personalizedPageRank(1L, 0.001).vertices
// Using PageRank object directly
val pageRankGraph = PageRank.run(graph, numIter = 20, resetProb = 0.1)Find connected components in undirected graphs using efficient label propagation.
/**
* Find connected components (assumes undirected graph)
* @returns Graph where each vertex has the smallest vertex ID in its component
*/
def connectedComponents(): Graph[VertexId, ED]
object ConnectedComponents {
/**
* Find connected components in a graph
* @param graph Input graph (treated as undirected)
* @returns Graph with component IDs as vertex attributes
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
/**
* Find connected components with iteration limit
* @param graph Input graph
* @param maxIterations Maximum number of iterations
* @returns Graph with component IDs
*/
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
maxIterations: Int
): Graph[VertexId, ED]
}Find strongly connected components in directed graphs using iterative algorithms.
/**
* Compute strongly connected components
* @param numIter Number of iterations to run
* @returns Graph where each vertex has the smallest vertex ID in its SCC
*/
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
object StronglyConnectedComponents {
/**
* Find strongly connected components in directed graph
* @param graph Input directed graph
* @param numIter Number of iterations
* @returns Graph with SCC IDs as vertex attributes
*/
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
numIter: Int
): Graph[VertexId, ED]
}Count triangles in graphs for clustering coefficient computation and social network analysis.
/**
* Count triangles passing through each vertex
* @returns Graph with triangle counts as vertex attributes
*/
def triangleCount(): Graph[Int, ED]
object TriangleCount {
/**
* Count triangles (requires canonical edge orientation)
* @param graph Input graph
* @returns Graph with triangle counts per vertex
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
/**
* Count triangles assuming graph is already in canonical form
* @param graph Pre-canonicalized graph
* @returns Graph with triangle counts
*/
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED]
): Graph[Int, ED]
}Usage Examples:
// Find connected components
val components = graph.connectedComponents().vertices
components.collect.foreach { case (id, component) =>
println(s"Vertex $id belongs to component $component")
}
// Count triangles
val triangles = graph.triangleCount().vertices
triangles.collect.foreach { case (id, count) =>
println(s"Vertex $id participates in $count triangles")
}
// Strongly connected components
val scc = graph.stronglyConnectedComponents(10).verticesCommunity detection algorithm using iterative label propagation for clustering and community structure discovery.
object LabelPropagation {
/**
* Run label propagation algorithm for community detection
* @param graph Input graph
* @param maxSteps Maximum number of propagation steps
* @returns Graph with community labels as vertex attributes
*/
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
maxSteps: Int
): Graph[VertexId, ED]
}Usage Examples:
import org.apache.spark.graphx.lib.LabelPropagation
// Community detection using label propagation
val communities = LabelPropagation.run(graph, maxSteps = 5).vertices
communities.collect.foreach { case (id, community) =>
println(s"Vertex $id belongs to community $community")
}Compute shortest paths from vertices to a set of landmark vertices using breadth-first search.
object ShortestPaths {
/** Map from landmark vertex ID to shortest distance */
type SPMap = Map[VertexId, Int]
/**
* Compute shortest paths to landmark vertices
* @param graph Input graph (edge weights ignored, all edges have weight 1)
* @param landmarks Set of landmark vertex IDs
* @returns Graph with shortest distance maps as vertex attributes
*/
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
landmarks: Seq[VertexId]
): Graph[SPMap, ED]
}Usage Examples:
import org.apache.spark.graphx.lib.ShortestPaths
// Compute shortest paths to landmarks 1, 2, 3
val landmarks = Seq(1L, 2L, 3L)
val distances = ShortestPaths.run(graph, landmarks).vertices
distances.collect.foreach { case (id, distanceMap) =>
println(s"Vertex $id distances: $distanceMap")
}Matrix factorization algorithm for recommendation systems and collaborative filtering.
object SVDPlusPlus {
/**
* Configuration for SVD++ algorithm
* @param rank Number of latent factors
* @param maxIters Maximum iterations
* @param minVal Minimum rating value
* @param maxVal Maximum rating value
* @param gamma1 Learning rate for user factors
* @param gamma2 Learning rate for item factors
* @param gamma6 Learning rate for user bias
* @param gamma7 Learning rate for item bias
*/
case class Conf(
rank: Int = 10,
maxIters: Int = 2,
minVal: Double = 0.0,
maxVal: Double = 5.0,
gamma1: Double = 0.007,
gamma2: Double = 0.007,
gamma6: Double = 0.005,
gamma7: Double = 0.015
)
/**
* Run SVD++ collaborative filtering
* @param edges RDD of rating edges (user -> item with rating)
* @param conf Algorithm configuration
* @returns Tuple of (trained model graph, training error)
*/
def run(
edges: RDD[Edge[Double]],
conf: Conf
): (Graph[(Array[Double], Array[Double], Double, Double), Double], Double)
}Usage Examples:
import org.apache.spark.graphx.lib.SVDPlusPlus
// Prepare rating data as edges
val ratings = sc.parallelize(Array(
Edge(1L, 101L, 4.0), // User 1 rates item 101 as 4.0
Edge(1L, 102L, 2.0),
Edge(2L, 101L, 5.0)
))
// Configure SVD++
val conf = SVDPlusPlus.Conf(
rank = 10, // 10 latent factors
maxIters = 20, // 20 iterations
minVal = 1.0, // Min rating 1.0
maxVal = 5.0 // Max rating 5.0
)
// Train model
val (model, rmse) = SVDPlusPlus.run(ratings, conf)
println(f"Training RMSE: $rmse%.3f")
// Extract learned factors
val userFactors = model.vertices.filter(_._1 < 100).collect // Users have ID < 100
val itemFactors = model.vertices.filter(_._1 >= 100).collect // Items have ID >= 100Helper functions and operations commonly used with graph algorithms.
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/**
* Collect neighbor vertex IDs for each vertex
* @param edgeDirection Direction of edges to consider
* @returns VertexRDD with arrays of neighbor IDs
*/
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
/**
* Collect neighbor vertices and their attributes
* @param edgeDirection Direction of edges to consider
* @returns VertexRDD with arrays of (VertexId, VertexAttribute) pairs
*/
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
/**
* Collect incident edges for each vertex
* @param edgeDirection Direction of edges to collect
* @returns VertexRDD with arrays of incident edges
*/
def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]]
/**
* Remove self-loops from the graph
* @returns Graph with self-loops removed
*/
def removeSelfEdges(): Graph[VD, ED]
/**
* Convert to canonical edge direction (srcId < dstId)
* @param mergeFunc Function to merge duplicate edges after canonicalization
* @returns Canonicalized graph
*/
def convertToCanonicalEdges(mergeFunc: (ED, ED) => ED): Graph[VD, ED]
/**
* Pick a random vertex from the graph
* @returns Random vertex ID
*/
def pickRandomVertex(): VertexId
}// For large graphs, use static PageRank with appropriate iterations
val ranks = graph
.partitionBy(PartitionStrategy.EdgePartition2D) // Optimize partitioning
.cache() // Cache for iterations
.staticPageRank(20)
// For personalized PageRank on multiple sources
val parallelRanks = PageRank.runParallelPersonalizedPageRank(
graph, numIter = 10, resetProb = 0.15, sources = Array(1L, 2L, 3L)
)// Use iteration limit for very large graphs to avoid excessive computation
val components = ConnectedComponents.run(graph, maxIterations = 50)
// Pre-partition for better performance
val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)
val fasterComponents = partitionedGraph.connectedComponents()// Triangle counting requires canonical edges - GraphX will canonicalize automatically
// but you can pre-canonicalize for better performance
val canonicalGraph = graph.convertToCanonicalEdges((a, b) => a)
val triangles = TriangleCount.runPreCanonicalized(canonicalGraph)
// Remove self-loops before triangle counting for accuracy
val cleanGraph = graph.removeSelfEdges()
val triangleCounts = cleanGraph.triangleCount()// Pattern for implementing custom convergence-based algorithms
def iterativeAlgorithm[VD, ED](graph: Graph[VD, ED], tolerance: Double): Graph[VD, ED] = {
var g = graph.cache()
var converged = false
var iteration = 0
while (!converged && iteration < 100) {
val newG = g.pregel(/* pregel parameters */)
// Check convergence by comparing vertex attributes
val changes = g.vertices.join(newG.vertices).map {
case (id, (oldAttr, newAttr)) => math.abs(oldAttr - newAttr)
}.max()
converged = changes < tolerance
g.unpersist(blocking = false)
g = newG.cache()
iteration += 1
}
g
}// Complete community detection and analysis pipeline
val graph = loadGraph()
// 1. Find communities using label propagation
val communities = LabelPropagation.run(graph, maxSteps = 10)
// 2. Analyze community structure
val communitySizes = communities.vertices
.map { case (id, community) => (community, 1) }
.reduceByKey(_ + _)
.collect()
// 3. Compute modularity within communities
val trianglesPerCommunity = communities
.subgraph(epred = triplet => triplet.srcAttr == triplet.dstAttr)
.triangleCount()
println(s"Found ${communitySizes.length} communities")
communitySizes.foreach { case (community, size) =>
println(s"Community $community has $size vertices")
}