or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-graph-api.mdgraph-algorithms.mdindex.mdpregel-api.mdutilities.md
tile.json

graph-algorithms.mddocs/

Graph Algorithms

Comprehensive collection of pre-implemented graph algorithms including PageRank, Connected Components, Triangle Counting, and community detection algorithms optimized for distributed execution.

Capabilities

PageRank Algorithm

Compute PageRank scores using both static (fixed iterations) and dynamic (convergence-based) implementations.

/**
 * Run PageRank until convergence  
 * @param tol Tolerance for convergence (change in rank below this stops)
 * @param resetProb Probability of random jump (damping factor = 1 - resetProb)
 * @returns Graph with PageRank scores as vertex attributes
 */
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

/**
 * Run PageRank for fixed number of iterations
 * @param numIter Number of iterations to run
 * @param resetProb Probability of random jump  
 * @returns Graph with PageRank scores as vertex attributes
 */
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

/**
 * Run personalized PageRank from a source vertex
 * @param src Source vertex for personalized PageRank
 * @param tol Tolerance for convergence
 * @param resetProb Probability of jumping back to source
 * @returns Graph with personalized PageRank scores
 */  
def personalizedPageRank(
  src: VertexId, 
  tol: Double,
  resetProb: Double = 0.15
): Graph[Double, Double]

object PageRank {
  /**
   * Static PageRank implementation  
   * @param graph Input graph
   * @param numIter Number of iterations
   * @param resetProb Random jump probability
   * @returns Graph with PageRank scores
   */
  def run[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED],
    numIter: Int,
    resetProb: Double = 0.15
  ): Graph[Double, Double]
  
  /**
   * Dynamic PageRank until convergence
   * @param graph Input graph  
   * @param tol Convergence tolerance
   * @param resetProb Random jump probability
   * @returns Graph with PageRank scores
   */
  def runUntilConvergence[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED],
    tol: Double,
    resetProb: Double = 0.15
  ): Graph[Double, Double]
  
  /**
   * PageRank with additional options
   * @param graph Input graph
   * @param numIter Number of iterations
   * @param resetProb Random jump probability  
   * @param srcId Optional source vertex for personalized PageRank
   * @returns Graph with PageRank scores
   */
  def runWithOptions[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED],
    numIter: Int,
    resetProb: Double,
    srcId: Option[VertexId] = None
  ): Graph[Double, Double]
  
  /**
   * Parallel personalized PageRank for multiple sources
   * @param graph Input graph
   * @param numIter Number of iterations
   * @param resetProb Random jump probability
   * @param sources Set of source vertices
   * @returns Graph with Vector of personalized PageRank scores
   */
  def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED], 
    numIter: Int,
    resetProb: Double = 0.15,
    sources: Array[VertexId]
  ): Graph[Vector, Double]
}

Usage Examples:

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

// Dynamic PageRank until convergence
val ranks = graph.pageRank(0.0001).vertices
ranks.collect.foreach { case (id, rank) => 
  println(s"Vertex $id has rank $rank")
}

// Static PageRank for 10 iterations  
val staticRanks = graph.staticPageRank(10).vertices

// Personalized PageRank from vertex 1
val personalizedRanks = graph.personalizedPageRank(1L, 0.001).vertices

// Using PageRank object directly
val pageRankGraph = PageRank.run(graph, numIter = 20, resetProb = 0.1)

Connected Components

Find connected components in undirected graphs using efficient label propagation.

/**
 * Find connected components (assumes undirected graph)
 * @returns Graph where each vertex has the smallest vertex ID in its component
 */
def connectedComponents(): Graph[VertexId, ED]

object ConnectedComponents {
  /**
   * Find connected components in a graph
   * @param graph Input graph (treated as undirected)
   * @returns Graph with component IDs as vertex attributes  
   */
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
  
  /**
   * Find connected components with iteration limit
   * @param graph Input graph
   * @param maxIterations Maximum number of iterations
   * @returns Graph with component IDs
   */
  def run[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED], 
    maxIterations: Int
  ): Graph[VertexId, ED]
}

Strongly Connected Components

Find strongly connected components in directed graphs using iterative algorithms.

/**
 * Compute strongly connected components  
 * @param numIter Number of iterations to run
 * @returns Graph where each vertex has the smallest vertex ID in its SCC
 */
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

object StronglyConnectedComponents {
  /**
   * Find strongly connected components in directed graph
   * @param graph Input directed graph
   * @param numIter Number of iterations  
   * @returns Graph with SCC IDs as vertex attributes
   */
  def run[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED], 
    numIter: Int
  ): Graph[VertexId, ED]
}

Triangle Counting

Count triangles in graphs for clustering coefficient computation and social network analysis.

/**
 * Count triangles passing through each vertex
 * @returns Graph with triangle counts as vertex attributes
 */
def triangleCount(): Graph[Int, ED]

object TriangleCount {
  /**
   * Count triangles (requires canonical edge orientation)
   * @param graph Input graph
   * @returns Graph with triangle counts per vertex
   */
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
  
  /**
   * Count triangles assuming graph is already in canonical form
   * @param graph Pre-canonicalized graph  
   * @returns Graph with triangle counts
   */
  def runPreCanonicalized[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED]
  ): Graph[Int, ED]
}

Usage Examples:

// Find connected components
val components = graph.connectedComponents().vertices
components.collect.foreach { case (id, component) =>
  println(s"Vertex $id belongs to component $component") 
}

// Count triangles
val triangles = graph.triangleCount().vertices  
triangles.collect.foreach { case (id, count) =>
  println(s"Vertex $id participates in $count triangles")
}

// Strongly connected components
val scc = graph.stronglyConnectedComponents(10).vertices

Label Propagation

Community detection algorithm using iterative label propagation for clustering and community structure discovery.

object LabelPropagation {
  /**
   * Run label propagation algorithm for community detection
   * @param graph Input graph
   * @param maxSteps Maximum number of propagation steps
   * @returns Graph with community labels as vertex attributes
   */
  def run[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED],
    maxSteps: Int
  ): Graph[VertexId, ED]
}

Usage Examples:

import org.apache.spark.graphx.lib.LabelPropagation

// Community detection using label propagation
val communities = LabelPropagation.run(graph, maxSteps = 5).vertices
communities.collect.foreach { case (id, community) =>
  println(s"Vertex $id belongs to community $community")
}

Shortest Paths

Compute shortest paths from vertices to a set of landmark vertices using breadth-first search.

object ShortestPaths {
  /** Map from landmark vertex ID to shortest distance */
  type SPMap = Map[VertexId, Int]
  
  /**
   * Compute shortest paths to landmark vertices
   * @param graph Input graph (edge weights ignored, all edges have weight 1)
   * @param landmarks Set of landmark vertex IDs
   * @returns Graph with shortest distance maps as vertex attributes
   */
  def run[VD: ClassTag, ED: ClassTag](
    graph: Graph[VD, ED],
    landmarks: Seq[VertexId]
  ): Graph[SPMap, ED]
}

Usage Examples:

import org.apache.spark.graphx.lib.ShortestPaths

// Compute shortest paths to landmarks 1, 2, 3
val landmarks = Seq(1L, 2L, 3L)
val distances = ShortestPaths.run(graph, landmarks).vertices

distances.collect.foreach { case (id, distanceMap) =>
  println(s"Vertex $id distances: $distanceMap")
}

SVD++ Collaborative Filtering

Matrix factorization algorithm for recommendation systems and collaborative filtering.

object SVDPlusPlus {
  /**
   * Configuration for SVD++ algorithm
   * @param rank Number of latent factors
   * @param maxIters Maximum iterations
   * @param minVal Minimum rating value
   * @param maxVal Maximum rating value  
   * @param gamma1 Learning rate for user factors
   * @param gamma2 Learning rate for item factors
   * @param gamma6 Learning rate for user bias
   * @param gamma7 Learning rate for item bias
   */
  case class Conf(
    rank: Int = 10,
    maxIters: Int = 2,
    minVal: Double = 0.0,
    maxVal: Double = 5.0,
    gamma1: Double = 0.007,
    gamma2: Double = 0.007, 
    gamma6: Double = 0.005,
    gamma7: Double = 0.015
  )
  
  /**
   * Run SVD++ collaborative filtering
   * @param edges RDD of rating edges (user -> item with rating)
   * @param conf Algorithm configuration
   * @returns Tuple of (trained model graph, training error)
   */
  def run(
    edges: RDD[Edge[Double]], 
    conf: Conf
  ): (Graph[(Array[Double], Array[Double], Double, Double), Double], Double)
}

Usage Examples:

import org.apache.spark.graphx.lib.SVDPlusPlus

// Prepare rating data as edges  
val ratings = sc.parallelize(Array(
  Edge(1L, 101L, 4.0), // User 1 rates item 101 as 4.0
  Edge(1L, 102L, 2.0),
  Edge(2L, 101L, 5.0)
))

// Configure SVD++
val conf = SVDPlusPlus.Conf(
  rank = 10,        // 10 latent factors
  maxIters = 20,    // 20 iterations
  minVal = 1.0,     // Min rating 1.0  
  maxVal = 5.0      // Max rating 5.0
)

// Train model
val (model, rmse) = SVDPlusPlus.run(ratings, conf)
println(f"Training RMSE: $rmse%.3f")

// Extract learned factors
val userFactors = model.vertices.filter(_._1 < 100).collect  // Users have ID < 100
val itemFactors = model.vertices.filter(_._1 >= 100).collect // Items have ID >= 100

Graph Utilities for Algorithms

Helper functions and operations commonly used with graph algorithms.

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
  /**
   * Collect neighbor vertex IDs for each vertex
   * @param edgeDirection Direction of edges to consider  
   * @returns VertexRDD with arrays of neighbor IDs
   */
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  
  /**
   * Collect neighbor vertices and their attributes  
   * @param edgeDirection Direction of edges to consider
   * @returns VertexRDD with arrays of (VertexId, VertexAttribute) pairs
   */
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  
  /**
   * Collect incident edges for each vertex
   * @param edgeDirection Direction of edges to collect
   * @returns VertexRDD with arrays of incident edges  
   */
  def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]]
  
  /**
   * Remove self-loops from the graph
   * @returns Graph with self-loops removed
   */
  def removeSelfEdges(): Graph[VD, ED]
  
  /**
   * Convert to canonical edge direction (srcId < dstId)
   * @param mergeFunc Function to merge duplicate edges after canonicalization
   * @returns Canonicalized graph
   */  
  def convertToCanonicalEdges(mergeFunc: (ED, ED) => ED): Graph[VD, ED]
  
  /**
   * Pick a random vertex from the graph
   * @returns Random vertex ID
   */
  def pickRandomVertex(): VertexId
}

Algorithm Performance Tips

PageRank Optimization

// For large graphs, use static PageRank with appropriate iterations
val ranks = graph
  .partitionBy(PartitionStrategy.EdgePartition2D)  // Optimize partitioning
  .cache()                                         // Cache for iterations
  .staticPageRank(20)

// For personalized PageRank on multiple sources  
val parallelRanks = PageRank.runParallelPersonalizedPageRank(
  graph, numIter = 10, resetProb = 0.15, sources = Array(1L, 2L, 3L)
)

Connected Components for Large Graphs

// Use iteration limit for very large graphs to avoid excessive computation
val components = ConnectedComponents.run(graph, maxIterations = 50)

// Pre-partition for better performance
val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)
val fasterComponents = partitionedGraph.connectedComponents()

Triangle Counting Preparation

// Triangle counting requires canonical edges - GraphX will canonicalize automatically
// but you can pre-canonicalize for better performance
val canonicalGraph = graph.convertToCanonicalEdges((a, b) => a)
val triangles = TriangleCount.runPreCanonicalized(canonicalGraph)

// Remove self-loops before triangle counting for accuracy
val cleanGraph = graph.removeSelfEdges()  
val triangleCounts = cleanGraph.triangleCount()

Common Algorithm Patterns

Iterative Convergence

// Pattern for implementing custom convergence-based algorithms
def iterativeAlgorithm[VD, ED](graph: Graph[VD, ED], tolerance: Double): Graph[VD, ED] = {
  var g = graph.cache()
  var converged = false
  var iteration = 0
  
  while (!converged && iteration < 100) {
    val newG = g.pregel(/* pregel parameters */)
    
    // Check convergence by comparing vertex attributes
    val changes = g.vertices.join(newG.vertices).map {
      case (id, (oldAttr, newAttr)) => math.abs(oldAttr - newAttr)
    }.max()
    
    converged = changes < tolerance
    g.unpersist(blocking = false)
    g = newG.cache()
    iteration += 1
  }
  
  g
}

Community Detection Pipeline

// Complete community detection and analysis pipeline
val graph = loadGraph()

// 1. Find communities using label propagation
val communities = LabelPropagation.run(graph, maxSteps = 10)

// 2. Analyze community structure  
val communitySizes = communities.vertices
  .map { case (id, community) => (community, 1) }
  .reduceByKey(_ + _)
  .collect()

// 3. Compute modularity within communities
val trianglesPerCommunity = communities
  .subgraph(epred = triplet => triplet.srcAttr == triplet.dstAttr)
  .triangleCount()

println(s"Found ${communitySizes.length} communities")
communitySizes.foreach { case (community, size) =>
  println(s"Community $community has $size vertices")
}