or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-graph-api.mdgraph-algorithms.mdindex.mdpregel-api.mdutilities.md
tile.json

utilities.mddocs/

Utilities

Graph loading, generation, and utility functions for creating test graphs, importing data, performance optimization, and working with GraphX efficiently.

Capabilities

Graph Loading

Load graphs from various file formats and data sources.

object GraphLoader {
  /**
   * Load graph from edge list file format
   * @param sc SparkContext  
   * @param path Path to edge list file (each line: "srcId dstId" or "srcId dstId weight")
   * @param canonicalOrientation Whether to orient edges canonically (srcId < dstId)
   * @param numEdgePartitions Number of edge partitions (-1 for default)
   * @param edgeStorageLevel Storage level for edges
   * @param vertexStorageLevel Storage level for vertices
   * @returns Graph with integer vertex and edge attributes
   */
  def edgeListFile(
    sc: SparkContext,
    path: String,
    canonicalOrientation: Boolean = false,
    numEdgePartitions: Int = -1,
    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
  ): Graph[Int, Int]
}

Usage Examples:

import org.apache.spark.graphx._

// Load graph from edge list file
// File format: each line contains "srcId dstId" or "srcId dstId weight"  
val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")

// Load with canonical orientation (srcId < dstId)
val canonicalGraph = GraphLoader.edgeListFile(
  sc, 
  "path/to/edges.txt",
  canonicalOrientation = true
)

// Load with custom partitioning and storage  
val optimizedGraph = GraphLoader.edgeListFile(
  sc,
  "hdfs://cluster/large-graph.txt", 
  canonicalOrientation = false,
  numEdgePartitions = 100,
  edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
  vertexStorageLevel = StorageLevel.MEMORY_ONLY
)

Graph Generators

Generate synthetic graphs for testing, benchmarking, and algorithm development.

object GraphGenerators {
  /** Default RMAT parameters for realistic graph generation */
  val RMATa: Double = 0.45
  val RMATb: Double = 0.15  
  val RMATd: Double = 0.25
  // RMATc = 1.0 - RMATa - RMATb - RMATd = 0.15
  
  /**
   * Generate log-normal degree distribution graph
   * @param sc SparkContext
   * @param numVertices Number of vertices  
   * @param numEParts Number of edge partitions (-1 for default)
   * @param mu Mean of underlying normal distribution
   * @param sigma Standard deviation of underlying normal distribution  
   * @param seed Random seed for reproducibility
   * @returns Graph with long vertex attributes and integer edge attributes
   */
  def logNormalGraph(
    sc: SparkContext,
    numVertices: Int,
    numEParts: Int = -1,
    mu: Double = 4.0,
    sigma: Double = 1.3,
    seed: Long = -1
  ): Graph[Long, Int]
  
  /**
   * Generate R-MAT graph with realistic structure
   * @param sc SparkContext
   * @param requestedNumVertices Desired number of vertices (will be rounded up to power of 2)
   * @param numEdges Number of edges to generate
   * @param a Probability of edge in top-left quadrant
   * @param b Probability of edge in top-right quadrant  
   * @param c Probability of edge in bottom-left quadrant
   * @param d Probability of edge in bottom-right quadrant (a+b+c+d should equal 1.0)
   * @param seed Random seed
   * @param numEParts Number of edge partitions
   * @returns R-MAT graph
   */
  def rmatGraph(
    sc: SparkContext,
    requestedNumVertices: Int,
    numEdges: Int, 
    a: Double = RMATa,
    b: Double = RMATb,
    c: Double = 1.0 - RMATa - RMATb - RMATd, 
    d: Double = RMATd,
    seed: Long = -1,
    numEParts: Int = -1  
  ): Graph[Int, Int]
  
  /**
   * Generate star graph (one central vertex connected to all others)
   * @param sc SparkContext
   * @param nverts Number of vertices (including center)
   * @param numEParts Number of edge partitions
   * @returns Star graph with center at vertex 0
   */
  def starGraph(
    sc: SparkContext, 
    nverts: Int,
    numEParts: Int = -1
  ): Graph[Int, Int]
  
  /**
   * Generate 2D grid graph  
   * @param sc SparkContext
   * @param rows Number of rows in grid
   * @param cols Number of columns in grid  
   * @returns Grid graph with vertices connected to adjacent cells
   */
  def gridGraph(
    sc: SparkContext,
    rows: Int, 
    cols: Int
  ): Graph[(Int, Int), Double]
}

Usage Examples:

import org.apache.spark.graphx.util.GraphGenerators

// Generate log-normal degree distribution graph (realistic social networks)
val socialGraph = GraphGenerators.logNormalGraph(sc, numVertices = 1000)

// Generate R-MAT graph with default parameters  
val rmatGraph = GraphGenerators.rmatGraph(sc, requestedNumVertices = 1024, numEdges = 5000)

// Generate custom R-MAT with different parameters
val customRMAT = GraphGenerators.rmatGraph(
  sc, 
  requestedNumVertices = 2048,
  numEdges = 10000,
  a = 0.57, b = 0.19, c = 0.19, d = 0.05  // More skewed distribution
)

// Generate star graph for testing centrality algorithms
val starGraph = GraphGenerators.starGraph(sc, nverts = 100)

// Generate grid graph for spatial algorithms
val gridGraph = GraphGenerators.gridGraph(sc, rows = 10, cols = 10)

// Use generated graphs for testing
val pageRanks = rmatGraph.pageRank(0.001).vertices
val components = socialGraph.connectedComponents().vertices

Graph Utilities and Optimization

Utility functions for graph optimization, serialization, and performance tuning.

object GraphXUtils {
  /**
   * Register GraphX classes with Kryo serialization for better performance
   * @param conf SparkConf to modify
   */
  def registerKryoClasses(conf: SparkConf): Unit
}

class PeriodicGraphCheckpointer[VD: ClassTag, ED: ClassTag](
  checkpointInterval: Int,
  sc: SparkContext
) {
  /**
   * Update the graph, managing checkpointing and persistence automatically
   * @param graph New graph to manage
   */  
  def update(graph: Graph[VD, ED]): Unit
  
  /**
   * Checkpoint the current graph if needed
   */
  def checkpoint(): Unit
  
  /**
   * Clean up all cached/checkpointed graphs
   */  
  def deleteAllCheckpoints(): Unit
}

object BytecodeUtils {
  /**
   * Test whether a closure invokes a specific method (for optimization)
   * @param closure Function closure to analyze  
   * @param targetClass Class containing the target method
   * @param targetMethod Name of method to check for
   * @returns Whether the closure calls the target method
   */
  def invokedMethod(
    closure: AnyRef,
    targetClass: Class[_], 
    targetMethod: String
  ): Boolean
}

Partition Strategies

Different strategies for distributing edges across partitions to optimize performance.

trait PartitionStrategy {
  /**
   * Determine which partition an edge should be assigned to
   * @param src Source vertex ID
   * @param dst Destination vertex ID  
   * @param numParts Total number of partitions
   * @returns Partition ID for this edge
   */
  def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
}

object PartitionStrategy {
  /**
   * 2D edge partitioning with vertex replication bound of √numParts
   * Provides good load balancing and communication efficiency
   */
  val EdgePartition2D: PartitionStrategy
  
  /**
   * Hash partitioning by source vertex only  
   * Simple but can lead to load imbalance
   */
  val EdgePartition1D: PartitionStrategy
  
  /**
   * Random partitioning that colocates same-direction edges
   * Good for undirected graphs
   */  
  val RandomVertexCut: PartitionStrategy
  
  /**
   * Random partitioning that colocates all edges between vertex pairs
   * Reduces communication for algorithms using both edge directions
   */
  val CanonicalRandomVertexCut: PartitionStrategy
  
  /**
   * Get partition strategy by string name
   * @param s Strategy name ("EdgePartition1D", "EdgePartition2D", etc.)
   * @returns Corresponding PartitionStrategy
   */
  def fromString(s: String): PartitionStrategy
}

Usage Examples:

// Optimize graph partitioning for better performance
val optimizedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)

// Use different strategies based on graph characteristics  
val strategy = if (graph.numVertices > 1000000) {
  PartitionStrategy.EdgePartition2D  // Better for large graphs
} else {
  PartitionStrategy.RandomVertexCut  // Simpler for small graphs  
}
val partitionedGraph = graph.partitionBy(strategy)

// Get strategy from configuration
val strategyName = "EdgePartition1D"
val configuredStrategy = PartitionStrategy.fromString(strategyName)

Performance Optimization Utilities

Helper functions and patterns for optimizing GraphX performance.

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
  /**
   * Convert multiple edges between vertices to single edges
   * @param merge Function to combine edge attributes  
   * @returns Graph with merged parallel edges
   */
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  
  /**
   * Remove self-loops (edges from vertex to itself)
   * @returns Graph without self-loops
   */
  def removeSelfEdges(): Graph[VD, ED]
  
  /**
   * Pick a random vertex ID from the graph
   * @returns Random vertex ID
   */
  def pickRandomVertex(): VertexId
  
  /**
   * Filter graph with preprocessing optimization
   * @param preprocess Function to optimize graph before filtering
   * @param epred Edge predicate
   * @param vpred Vertex predicate  
   * @returns Filtered graph
   */
  def filter[VD2: ClassTag, ED2: ClassTag](
    preprocess: Graph[VD, ED] => Graph[VD2, ED2],
    epred: EdgeTriplet[VD2, ED2] => Boolean,
    vpred: (VertexId, VD2) => Boolean
  ): Graph[VD, ED]
}

TripletFields Optimization

Control which triplet fields are accessed to optimize message passing performance.

class TripletFields(val useSrc: Boolean, val useDst: Boolean, val useEdge: Boolean) {
  // Java class for specifying which EdgeTriplet/EdgeContext fields are accessed
}

object TripletFields {
  /** No fields are accessed */
  val None: TripletFields
  
  /** Only edge attribute is accessed */  
  val EdgeOnly: TripletFields
  
  /** Source vertex and edge attributes are accessed */
  val Src: TripletFields
  
  /** Destination vertex and edge attributes are accessed */
  val Dst: TripletFields
  
  /** All fields are accessed (default) */
  val All: TripletFields
}

Usage Examples:

// Optimize aggregateMessages with TripletFields
val inDegrees = graph.aggregateMessages[Int](
  sendMsg = ctx => ctx.sendToDst(1),  // Only sending to destination
  mergeMsg = (a, b) => a + b,
  tripletFields = TripletFields.None  // No triplet fields needed
)

val weightedInDegrees = graph.aggregateMessages[Double](
  sendMsg = ctx => ctx.sendToDst(ctx.attr),  // Using edge attribute
  mergeMsg = (a, b) => a + b,
  tripletFields = TripletFields.EdgeOnly  // Only edge attribute needed
)

val neighborSum = graph.aggregateMessages[Double](
  sendMsg = ctx => ctx.sendToDst(ctx.srcAttr),  // Using source attribute
  mergeMsg = (a, b) => a + b, 
  tripletFields = TripletFields.Src  // Only source attribute needed
)

Performance Optimization Patterns

Graph Construction Optimization

// Efficient graph construction for large datasets
def buildLargeGraph(vertices: RDD[(VertexId, String)], edges: RDD[Edge[Double]]): Graph[String, Double] = {
  
  // Partition edges for better locality
  val partitionedEdges = edges.partitionBy(new HashPartitioner(100))
  
  // Use appropriate storage levels
  val graph = Graph(
    vertices, 
    partitionedEdges,
    defaultVertexAttr = "Unknown",
    edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,
    vertexStorageLevel = StorageLevel.MEMORY_ONLY
  )
  
  // Apply efficient partitioning strategy
  graph.partitionBy(PartitionStrategy.EdgePartition2D).cache()
}

Iterative Algorithm Optimization

// Optimize for iterative algorithms
def optimizeForIterativeAlgorithms[VD: ClassTag, ED: ClassTag](
  graph: Graph[VD, ED]
): Graph[VD, ED] = {
  
  graph
    .partitionBy(PartitionStrategy.EdgePartition2D)  // Better load balancing
    .cache()                                         // Cache for multiple iterations
    .checkpoint()                                    // Checkpoint for fault tolerance
}

// Use with iterative algorithms  
val optimizedGraph = optimizeForIterativeAlgorithms(graph)
val pageRanks = optimizedGraph.pageRank(0.001)
val components = optimizedGraph.connectedComponents()

// Clean up when done
optimizedGraph.unpersist()

Memory Management

// Manage memory usage for large graphs
def memoryEfficientProcessing[VD: ClassTag, ED: ClassTag](
  graph: Graph[VD, ED]
): Unit = {
  
  // Use serialized storage for large graphs
  val efficientGraph = graph.persist(StorageLevel.MEMORY_AND_DISK_SER)
  
  try {
    // Process graph
    val results = efficientGraph.pageRank(0.001)
    
    // Process results immediately  
    results.vertices.foreachPartition { iter =>
      iter.foreach { case (id, rank) => 
        // Process each result
      }
    }
    
  } finally {
    // Always clean up
    efficientGraph.unpersist(blocking = false)
  }
}

Kryo Serialization Setup

// Configure Spark for optimal GraphX performance
def configureSparkForGraphX(appName: String): SparkContext = {
  val conf = new SparkConf()
    .setAppName(appName)
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
    .set("spark.locality.wait", "0s")  // Disable locality wait for GraphX
    .set("spark.sql.adaptive.enabled", "false")  // Can interfere with GraphX
  
  // Register GraphX classes with Kryo
  GraphXUtils.registerKryoClasses(conf)
  
  new SparkContext(conf)
}

Common Utility Patterns

Graph Validation and Debugging

// Validate graph structure and properties
def validateGraph[VD, ED](graph: Graph[VD, ED]): Unit = {
  println(s"Vertices: ${graph.numVertices}")  
  println(s"Edges: ${graph.numEdges}")
  
  // Check for self-loops
  val selfLoops = graph.edges.filter(e => e.srcId == e.dstId).count()
  println(s"Self-loops: $selfLoops")
  
  // Check degree distribution
  val degrees = graph.degrees.map(_._2)
  val maxDegree = degrees.max()
  val avgDegree = degrees.mean()
  println(s"Max degree: $maxDegree, Average degree: $avgDegree")
  
  // Check connectivity  
  val components = graph.connectedComponents().vertices.map(_._2).distinct().count()
  println(s"Connected components: $components")
}

Graph Format Conversion

// Convert between different graph representations
def convertEdgeListToAdjacencyList[ED](graph: Graph[Long, ED]): RDD[(VertexId, Array[VertexId])] = {
  graph.collectNeighborIds(EdgeDirection.Out)
}

def saveGraphToEdgeList[VD, ED](graph: Graph[VD, ED], path: String): Unit = {
  graph.edges
    .map(edge => s"${edge.srcId} ${edge.dstId}")
    .saveAsTextFile(path)
}

def loadGraphFromAdjacencyList(sc: SparkContext, path: String): Graph[Int, Int] = {
  val adjacencyList = sc.textFile(path).map { line =>
    val parts = line.split("\\s+")
    val src = parts(0).toLong
    val neighbors = parts.tail.map(_.toLong)
    (src, neighbors)
  }
  
  val edges = adjacencyList.flatMap { case (src, neighbors) =>
    neighbors.map(dst => Edge(src, dst, 1))
  }
  
  Graph.fromEdges(edges, defaultValue = 0)
}